last_sample_time = start_time
realtime_update_interval = 1.0 # Update every second
last_realtime_update = start_time
+ interrupted = False
- while running_time < duration_sec:
- # Check if live collector wants to stop
- if hasattr(collector, 'running') and not collector.running:
- break
-
- current_time = time.perf_counter()
- if next_time < current_time:
- try:
- stack_frames = self.unwinder.get_stack_trace()
- collector.collect(stack_frames)
- except ProcessLookupError:
- duration_sec = current_time - start_time
+ try:
+ while running_time < duration_sec:
+ # Check if live collector wants to stop
+ if hasattr(collector, 'running') and not collector.running:
break
- except (RuntimeError, UnicodeDecodeError, MemoryError, OSError):
- collector.collect_failed_sample()
- errors += 1
- except Exception as e:
- if not self._is_process_running():
- break
- raise e from None
-
- # Track actual sampling intervals for real-time stats
- if num_samples > 0:
- actual_interval = current_time - last_sample_time
- self.sample_intervals.append(
- 1.0 / actual_interval
- ) # Convert to Hz
- self.total_samples += 1
-
- # Print real-time statistics if enabled
- if (
- self.realtime_stats
- and (current_time - last_realtime_update)
- >= realtime_update_interval
- ):
- self._print_realtime_stats()
- last_realtime_update = current_time
-
- last_sample_time = current_time
- num_samples += 1
- next_time += sample_interval_sec
+ current_time = time.perf_counter()
+ if next_time < current_time:
+ try:
+ stack_frames = self.unwinder.get_stack_trace()
+ collector.collect(stack_frames)
+ except ProcessLookupError:
+ duration_sec = current_time - start_time
+ break
+ except (RuntimeError, UnicodeDecodeError, MemoryError, OSError):
+ collector.collect_failed_sample()
+ errors += 1
+ except Exception as e:
+ if not self._is_process_running():
+ break
+ raise e from None
+
+ # Track actual sampling intervals for real-time stats
+ if num_samples > 0:
+ actual_interval = current_time - last_sample_time
+ self.sample_intervals.append(
+ 1.0 / actual_interval
+ ) # Convert to Hz
+ self.total_samples += 1
+
+ # Print real-time statistics if enabled
+ if (
+ self.realtime_stats
+ and (current_time - last_realtime_update)
+ >= realtime_update_interval
+ ):
+ self._print_realtime_stats()
+ last_realtime_update = current_time
+
+ last_sample_time = current_time
+ num_samples += 1
+ next_time += sample_interval_sec
+
+ running_time = time.perf_counter() - start_time
+ except KeyboardInterrupt:
+ interrupted = True
running_time = time.perf_counter() - start_time
+ print("Interrupted by user.")
# Clear real-time stats line if it was being displayed
if self.realtime_stats and len(self.sample_intervals) > 0:
collector.set_stats(self.sample_interval_usec, running_time, sample_rate, error_rate, mode=self.mode)
expected_samples = int(duration_sec / sample_interval_sec)
- if num_samples < expected_samples and not is_live_mode:
+ if num_samples < expected_samples and not is_live_mode and not interrupted:
print(
f"Warning: missed {expected_samples - num_samples} samples "
f"from the expected total of {expected_samples} "
self.assertIn("Warning: missed", result)
self.assertIn("samples from the expected total", result)
+ def test_sample_profiler_keyboard_interrupt(self):
+ mock_unwinder = mock.MagicMock()
+ mock_unwinder.get_stack_trace.side_effect = [
+ [
+ (
+ 1,
+ [
+ mock.MagicMock(
+ filename="test.py", lineno=10, funcname="test_func"
+ )
+ ],
+ )
+ ],
+ KeyboardInterrupt(),
+ ]
+
+ with mock.patch(
+ "_remote_debugging.RemoteUnwinder"
+ ) as mock_unwinder_class:
+ mock_unwinder_class.return_value = mock_unwinder
+ profiler = SampleProfiler(
+ pid=12345, sample_interval_usec=10000, all_threads=False
+ )
+ mock_collector = mock.MagicMock()
+ times = [0.0, 0.01, 0.02, 0.03, 0.04]
+ with mock.patch("time.perf_counter", side_effect=times):
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ try:
+ profiler.sample(mock_collector, duration_sec=1.0)
+ except KeyboardInterrupt:
+ self.fail(
+ "KeyboardInterrupt was not handled by the profiler"
+ )
+ result = output.getvalue()
+ self.assertIn("Interrupted by user.", result)
+ self.assertIn("Captured", result)
+ self.assertIn("samples", result)
+ self.assertNotIn("Warning: missed", result)
+
@force_not_colorized_test_class
class TestPrintSampledStats(unittest.TestCase):