]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-114911: Add CPUStopwatch test helper (GH-114912)
authorPetr Viktorin <encukou@gmail.com>
Wed, 28 Feb 2024 11:53:48 +0000 (12:53 +0100)
committerGitHub <noreply@github.com>
Wed, 28 Feb 2024 11:53:48 +0000 (12:53 +0100)
A few of our tests measure the time of CPU-bound operation, mainly
to avoid quadratic or worse behaviour.
Add a helper to ignore GC and time spent in other processes.

Lib/test/support/__init__.py
Lib/test/test_int.py
Lib/test/test_re.py

index 1d03ec0f5bd12be0bb10e425c6f7b6fa4124835a..401b2ce1fe213cb380f5989ea2f9fa7884f67dad 100644 (file)
@@ -2381,6 +2381,46 @@ def sleeping_retry(timeout, err_msg=None, /,
         delay = min(delay * 2, max_delay)
 
 
+class CPUStopwatch:
+    """Context manager to roughly time a CPU-bound operation.
+
+    Disables GC. Uses CPU time if it can (i.e. excludes sleeps & time of
+    other processes).
+
+    N.B.:
+    - This *includes* time spent in other threads.
+    - Some systems only have a coarse resolution; check
+      stopwatch.clock_info.rseolution if.
+
+    Usage:
+
+    with ProcessStopwatch() as stopwatch:
+        ...
+    elapsed = stopwatch.seconds
+    resolution = stopwatch.clock_info.resolution
+    """
+    def __enter__(self):
+        get_time = time.process_time
+        clock_info = time.get_clock_info('process_time')
+        if get_time() <= 0:  # some platforms like WASM lack process_time()
+            get_time = time.monotonic
+            clock_info = time.get_clock_info('monotonic')
+        self.context = disable_gc()
+        self.context.__enter__()
+        self.get_time = get_time
+        self.clock_info = clock_info
+        self.start_time = get_time()
+        return self
+
+    def __exit__(self, *exc):
+        try:
+            end_time = self.get_time()
+        finally:
+            result = self.context.__exit__(*exc)
+        self.seconds = end_time - self.start_time
+        return result
+
+
 @contextlib.contextmanager
 def adjust_int_max_str_digits(max_digits):
     """Temporarily change the integer string conversion length limit."""
index 0bf55facad9fedbf741ea68c6f2514ef7d19d7d2..47fc50a0e2034972569f3b9288afd634a2d0696d 100644 (file)
@@ -664,84 +664,78 @@ class IntStrDigitLimitsTests(unittest.TestCase):
         """Regression test: ensure we fail before performing O(N**2) work."""
         maxdigits = sys.get_int_max_str_digits()
         assert maxdigits < 50_000, maxdigits  # A test prerequisite.
-        get_time = time.process_time
-        if get_time() <= 0:  # some platforms like WASM lack process_time()
-            get_time = time.monotonic
 
         huge_int = int(f'0x{"c"*65_000}', base=16)  # 78268 decimal digits.
         digits = 78_268
-        with support.adjust_int_max_str_digits(digits):
-            start = get_time()
+        with (
+                support.adjust_int_max_str_digits(digits),
+                support.CPUStopwatch() as sw_convert):
             huge_decimal = str(huge_int)
-        seconds_to_convert = get_time() - start
         self.assertEqual(len(huge_decimal), digits)
         # Ensuring that we chose a slow enough conversion to measure.
         # It takes 0.1 seconds on a Zen based cloud VM in an opt build.
         # Some OSes have a low res 1/64s timer, skip if hard to measure.
-        if seconds_to_convert < 1/64:
+        if sw_convert.seconds < sw_convert.clock_info.resolution * 2:
             raise unittest.SkipTest('"slow" conversion took only '
-                                    f'{seconds_to_convert} seconds.')
+                                    f'{sw_convert.seconds} seconds.')
 
         # We test with the limit almost at the size needed to check performance.
         # The performant limit check is slightly fuzzy, give it a some room.
         with support.adjust_int_max_str_digits(int(.995 * digits)):
-            with self.assertRaises(ValueError) as err:
-                start = get_time()
+            with (
+                    self.assertRaises(ValueError) as err,
+                    support.CPUStopwatch() as sw_fail_huge):
                 str(huge_int)
-            seconds_to_fail_huge = get_time() - start
         self.assertIn('conversion', str(err.exception))
-        self.assertLessEqual(seconds_to_fail_huge, seconds_to_convert/2)
+        self.assertLessEqual(sw_fail_huge.seconds, sw_convert.seconds/2)
 
         # Now we test that a conversion that would take 30x as long also fails
         # in a similarly fast fashion.
         extra_huge_int = int(f'0x{"c"*500_000}', base=16)  # 602060 digits.
-        with self.assertRaises(ValueError) as err:
-            start = get_time()
+        with (
+                self.assertRaises(ValueError) as err,
+                support.CPUStopwatch() as sw_fail_extra_huge):
             # If not limited, 8 seconds said Zen based cloud VM.
             str(extra_huge_int)
-        seconds_to_fail_extra_huge = get_time() - start
         self.assertIn('conversion', str(err.exception))
-        self.assertLess(seconds_to_fail_extra_huge, seconds_to_convert/2)
+        self.assertLess(sw_fail_extra_huge.seconds, sw_convert.seconds/2)
 
     def test_denial_of_service_prevented_str_to_int(self):
         """Regression test: ensure we fail before performing O(N**2) work."""
         maxdigits = sys.get_int_max_str_digits()
         assert maxdigits < 100_000, maxdigits  # A test prerequisite.
-        get_time = time.process_time
-        if get_time() <= 0:  # some platforms like WASM lack process_time()
-            get_time = time.monotonic
 
         digits = 133700
         huge = '8'*digits
-        with support.adjust_int_max_str_digits(digits):
-            start = get_time()
+        with (
+                support.adjust_int_max_str_digits(digits),
+                support.CPUStopwatch() as sw_convert):
             int(huge)
-        seconds_to_convert = get_time() - start
         # Ensuring that we chose a slow enough conversion to measure.
         # It takes 0.1 seconds on a Zen based cloud VM in an opt build.
         # Some OSes have a low res 1/64s timer, skip if hard to measure.
-        if seconds_to_convert < 1/64:
+        if sw_convert.seconds < sw_convert.clock_info.resolution * 2:
             raise unittest.SkipTest('"slow" conversion took only '
-                                    f'{seconds_to_convert} seconds.')
+                                    f'{sw_convert.seconds} seconds.')
 
         with support.adjust_int_max_str_digits(digits - 1):
-            with self.assertRaises(ValueError) as err:
-                start = get_time()
+            with (
+                    self.assertRaises(ValueError) as err,
+                    support.CPUStopwatch() as sw_fail_huge):
                 int(huge)
-            seconds_to_fail_huge = get_time() - start
         self.assertIn('conversion', str(err.exception))
-        self.assertLessEqual(seconds_to_fail_huge, seconds_to_convert/2)
+        self.assertLessEqual(sw_fail_huge.seconds, sw_convert.seconds/2)
 
         # Now we test that a conversion that would take 30x as long also fails
         # in a similarly fast fashion.
         extra_huge = '7'*1_200_000
-        with self.assertRaises(ValueError) as err:
-            start = get_time()
+        with (
+                self.assertRaises(ValueError) as err,
+                support.CPUStopwatch() as sw_fail_extra_huge):
             # If not limited, 8 seconds in the Zen based cloud VM.
             int(extra_huge)
-        seconds_to_fail_extra_huge = get_time() - start
         self.assertIn('conversion', str(err.exception))
-        self.assertLessEqual(seconds_to_fail_extra_huge, seconds_to_convert/2)
+        self.assertLessEqual(sw_fail_extra_huge.seconds, sw_convert.seconds/2)
 
     def test_power_of_two_bases_unlimited(self):
         """The limit does not apply to power of 2 bases."""
index 993a7d6e264a1f4a10841a8069b33f5795d1b0fb..b1ac22c28cf7c136028b0aefe162215cb898caa1 100644 (file)
@@ -1,7 +1,7 @@
 from test.support import (gc_collect, bigmemtest, _2G,
                           cpython_only, captured_stdout,
                           check_disallow_instantiation, is_emscripten, is_wasi,
-                          warnings_helper, SHORT_TIMEOUT)
+                          warnings_helper, SHORT_TIMEOUT, CPUStopwatch)
 import locale
 import re
 import string
@@ -2284,17 +2284,16 @@ class ReTests(unittest.TestCase):
 
     def test_search_anchor_at_beginning(self):
         s = 'x'*10**7
-        start = time.perf_counter()
-        for p in r'\Ay', r'^y':
-            self.assertIsNone(re.search(p, s))
-            self.assertEqual(re.split(p, s), [s])
-            self.assertEqual(re.findall(p, s), [])
-            self.assertEqual(list(re.finditer(p, s)), [])
-            self.assertEqual(re.sub(p, '', s), s)
-        t = time.perf_counter() - start
+        with CPUStopwatch() as stopwatch:
+            for p in r'\Ay', r'^y':
+                self.assertIsNone(re.search(p, s))
+                self.assertEqual(re.split(p, s), [s])
+                self.assertEqual(re.findall(p, s), [])
+                self.assertEqual(list(re.finditer(p, s)), [])
+                self.assertEqual(re.sub(p, '', s), s)
         # Without optimization it takes 1 second on my computer.
         # With optimization -- 0.0003 seconds.
-        self.assertLess(t, 0.1)
+        self.assertLess(stopwatch.seconds, 0.1)
 
     def test_possessive_quantifiers(self):
         """Test Possessive Quantifiers