]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[3.11] Add test.support.busy_retry() (#93770) (#110341)
authorVictor Stinner <vstinner@python.org>
Wed, 4 Oct 2023 10:58:49 +0000 (12:58 +0200)
committerGitHub <noreply@github.com>
Wed, 4 Oct 2023 10:58:49 +0000 (12:58 +0200)
Add test.support.busy_retry() (#93770)

Add busy_retry() and sleeping_retry() functions to test.support.

(cherry picked from commit 7e9eaad864349d2cfd4c9ffc4453aba03b2cbc16)

12 files changed:
Doc/library/test.rst
Lib/test/_test_multiprocessing.py
Lib/test/fork_wait.py
Lib/test/support/__init__.py
Lib/test/test__xxsubinterpreters.py
Lib/test/test_concurrent_futures/test_init.py
Lib/test/test_multiprocessing_main_handling.py
Lib/test/test_signal.py
Lib/test/test_ssl.py
Lib/test/test_support.py
Lib/test/test_wait3.py
Lib/test/test_wait4.py

index 427953e0077aab6cf7c743ed3e4460a334f332e6..d3eb0ae00a08dcd675b30632b966c2df43bf9388 100644 (file)
@@ -404,6 +404,51 @@ The :mod:`test.support` module defines the following constants:
 
 The :mod:`test.support` module defines the following functions:
 
+.. function:: busy_retry(timeout, err_msg=None, /, *, error=True)
+
+   Run the loop body until ``break`` stops the loop.
+
+   After *timeout* seconds, raise an :exc:`AssertionError` if *error* is true,
+   or just stop the loop if *error* is false.
+
+   Example::
+
+       for _ in support.busy_retry(support.SHORT_TIMEOUT):
+           if check():
+               break
+
+   Example of error=False usage::
+
+       for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
+           if check():
+               break
+       else:
+           raise RuntimeError('my custom error')
+
+.. function:: sleeping_retry(timeout, err_msg=None, /, *, init_delay=0.010, max_delay=1.0, error=True)
+
+   Wait strategy that applies exponential backoff.
+
+   Run the loop body until ``break`` stops the loop. Sleep at each loop
+   iteration, but not at the first iteration. The sleep delay is doubled at
+   each iteration (up to *max_delay* seconds).
+
+   See :func:`busy_retry` documentation for the parameters usage.
+
+   Example raising an exception after SHORT_TIMEOUT seconds::
+
+       for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+           if check():
+               break
+
+   Example of error=False usage::
+
+       for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+           if check():
+               break
+       else:
+           raise RuntimeError('my custom error')
+
 .. function:: is_resource_enabled(resource)
 
    Return ``True`` if *resource* is enabled and available. The list of
index 6249062639b8d4e1ce685d6a93be52e10d711a03..6459757eb93d51929ea6d88f61e302b45da3fc1c 100644 (file)
@@ -4376,18 +4376,13 @@ class _TestSharedMemory(BaseTestCase):
             p.terminate()
             p.wait()
 
-            deadline = time.monotonic() + support.LONG_TIMEOUT
-            t = 0.1
-            while time.monotonic() < deadline:
-                time.sleep(t)
-                t = min(t*2, 5)
+            err_msg = ("A SharedMemory segment was leaked after "
+                       "a process was abruptly terminated")
+            for _ in support.sleeping_retry(support.LONG_TIMEOUT, err_msg):
                 try:
                     smm = shared_memory.SharedMemory(name, create=False)
                 except FileNotFoundError:
                     break
-            else:
-                raise AssertionError("A SharedMemory segment was leaked after"
-                                     " a process was abruptly terminated.")
 
             if os.name == 'posix':
                 # Without this line it was raising warnings like:
@@ -5458,9 +5453,10 @@ class TestResourceTracker(unittest.TestCase):
                 p.terminate()
                 p.wait()
 
-                deadline = time.monotonic() + support.LONG_TIMEOUT
-                while time.monotonic() < deadline:
-                    time.sleep(.5)
+                err_msg = (f"A {rtype} resource was leaked after a process was "
+                           f"abruptly terminated")
+                for _ in support.sleeping_retry(support.SHORT_TIMEOUT,
+                                                  err_msg):
                     try:
                         _resource_unlink(name2, rtype)
                     except OSError as e:
@@ -5468,10 +5464,7 @@ class TestResourceTracker(unittest.TestCase):
                         # EINVAL
                         self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
                         break
-                else:
-                    raise AssertionError(
-                        f"A {rtype} resource was leaked after a process was "
-                        f"abruptly terminated.")
+
                 err = p.stderr.read().decode('utf-8')
                 p.stderr.close()
                 expected = ('resource_tracker: There appear to be 2 leaked {} '
@@ -5707,18 +5700,17 @@ class TestSyncManagerTypes(unittest.TestCase):
         # but this can take a bit on slow machines, so wait a few seconds
         # if there are other children too (see #17395).
         join_process(self.proc)
+
         start_time = time.monotonic()
-        t = 0.01
-        while len(multiprocessing.active_children()) > 1:
-            time.sleep(t)
-            t *= 2
-            dt = time.monotonic() - start_time
-            if dt >= 5.0:
-                test.support.environment_altered = True
-                support.print_warning(f"multiprocessing.Manager still has "
-                                      f"{multiprocessing.active_children()} "
-                                      f"active children after {dt} seconds")
+        for _ in support.sleeping_retry(5.0, error=False):
+            if len(multiprocessing.active_children()) <= 1:
                 break
+        else:
+            dt = time.monotonic() - start_time
+            support.environment_altered = True
+            support.print_warning(f"multiprocessing.Manager still has "
+                                  f"{multiprocessing.active_children()} "
+                                  f"active children after {dt:.1f} seconds")
 
     def run_worker(self, worker, obj):
         self.proc = multiprocessing.Process(target=worker, args=(obj, ))
@@ -6031,17 +6023,15 @@ class ManagerMixin(BaseMixin):
         # but this can take a bit on slow machines, so wait a few seconds
         # if there are other children too (see #17395)
         start_time = time.monotonic()
-        t = 0.01
-        while len(multiprocessing.active_children()) > 1:
-            time.sleep(t)
-            t *= 2
-            dt = time.monotonic() - start_time
-            if dt >= 5.0:
-                test.support.environment_altered = True
-                support.print_warning(f"multiprocessing.Manager still has "
-                                      f"{multiprocessing.active_children()} "
-                                      f"active children after {dt} seconds")
+        for _ in support.sleeping_retry(5.0, error=False):
+            if len(multiprocessing.active_children()) <= 1:
                 break
+        else:
+            dt = time.monotonic() - start_time
+            support.environment_altered = True
+            support.print_warning(f"multiprocessing.Manager still has "
+                                  f"{multiprocessing.active_children()} "
+                                  f"active children after {dt:.1f} seconds")
 
         gc.collect()                       # do garbage collection
         if cls.manager._number_of_objects() != 0:
index 4d3dbd8e83f5a9bcdf463f2cfc0f1b02dedb9567..c565f593559483aab85b57fc625e92ed7c31e44c 100644 (file)
@@ -54,10 +54,8 @@ class ForkWait(unittest.TestCase):
             self.threads.append(thread)
 
         # busy-loop to wait for threads
-        deadline = time.monotonic() + support.SHORT_TIMEOUT
-        while len(self.alive) < NUM_THREADS:
-            time.sleep(0.1)
-            if deadline < time.monotonic():
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+            if len(self.alive) >= NUM_THREADS:
                 break
 
         a = sorted(self.alive.keys())
index 2e6518cf241f14ebffc85f54cc97f2874dbf7ae2..6ee525c061f62496a5cf974851f32f041b0989ba 100644 (file)
@@ -2324,6 +2324,82 @@ def requires_venv_with_pip():
     return unittest.skipUnless(ctypes, 'venv: pip requires ctypes')
 
 
+def busy_retry(timeout, err_msg=None, /, *, error=True):
+    """
+    Run the loop body until "break" stops the loop.
+
+    After *timeout* seconds, raise an AssertionError if *error* is true,
+    or just stop if *error is false.
+
+    Example:
+
+        for _ in support.busy_retry(support.SHORT_TIMEOUT):
+            if check():
+                break
+
+    Example of error=False usage:
+
+        for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
+            if check():
+                break
+        else:
+            raise RuntimeError('my custom error')
+
+    """
+    if timeout <= 0:
+        raise ValueError("timeout must be greater than zero")
+
+    start_time = time.monotonic()
+    deadline = start_time + timeout
+
+    while True:
+        yield
+
+        if time.monotonic() >= deadline:
+            break
+
+    if error:
+        dt = time.monotonic() - start_time
+        msg = f"timeout ({dt:.1f} seconds)"
+        if err_msg:
+            msg = f"{msg}: {err_msg}"
+        raise AssertionError(msg)
+
+
+def sleeping_retry(timeout, err_msg=None, /,
+                     *, init_delay=0.010, max_delay=1.0, error=True):
+    """
+    Wait strategy that applies exponential backoff.
+
+    Run the loop body until "break" stops the loop. Sleep at each loop
+    iteration, but not at the first iteration. The sleep delay is doubled at
+    each iteration (up to *max_delay* seconds).
+
+    See busy_retry() documentation for the parameters usage.
+
+    Example raising an exception after SHORT_TIMEOUT seconds:
+
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+            if check():
+                break
+
+    Example of error=False usage:
+
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+            if check():
+                break
+        else:
+            raise RuntimeError('my custom error')
+    """
+
+    delay = init_delay
+    for _ in busy_retry(timeout, err_msg, error=error):
+        yield
+
+        time.sleep(delay)
+        delay = min(delay * 2, max_delay)
+
+
 @contextlib.contextmanager
 def adjust_int_max_str_digits(max_digits):
     """Temporarily change the integer string conversion length limit."""
index 5d0ed9ea14ac7fdb7806941031047e6baeb1b364..f20aae8e21c66fafff1788ec28df73c0453c43eb 100644 (file)
@@ -45,12 +45,11 @@ def _wait_for_interp_to_run(interp, timeout=None):
     # run subinterpreter eariler than the main thread in multiprocess.
     if timeout is None:
         timeout = support.SHORT_TIMEOUT
-    start_time = time.monotonic()
-    deadline = start_time + timeout
-    while not interpreters.is_running(interp):
-        if time.monotonic() > deadline:
-            raise RuntimeError('interp is not running')
-        time.sleep(0.010)
+    for _ in support.sleeping_retry(timeout, error=False):
+        if interpreters.is_running(interp):
+            break
+    else:
+        raise RuntimeError('interp is not running')
 
 
 @contextlib.contextmanager
index 138d6ee545fd9282601c3a5ed13666e213a27be2..ebab9437b9c4e150a2c220f68c00a3a9c8f513d0 100644 (file)
@@ -78,11 +78,10 @@ class FailingInitializerMixin(ExecutorMixin):
                     future.result()
 
             # At some point, the executor should break
-            t1 = time.monotonic()
-            while not self.executor._broken:
-                if time.monotonic() - t1 > 5:
-                    self.fail("executor not broken after 5 s.")
-                time.sleep(0.01)
+            for _ in support.sleeping_retry(5, "executor not broken"):
+                if self.executor._broken:
+                    break
+
             # ... and from this point submit() is guaranteed to fail
             with self.assertRaises(BrokenExecutor):
                 self.executor.submit(get_init_status)
index 510d8d3a7597e195bf57312b9b7d802211391f2b..35e9cd64fa6c01c3b77d30dd6c1bbff6a57c93db 100644 (file)
@@ -40,6 +40,7 @@ test_source = """\
 import sys
 import time
 from multiprocessing import Pool, set_start_method
+from test import support
 
 # We use this __main__ defined function in the map call below in order to
 # check that multiprocessing in correctly running the unguarded
@@ -59,13 +60,11 @@ if __name__ == '__main__':
     results = []
     with Pool(5) as pool:
         pool.map_async(f, [1, 2, 3], callback=results.extend)
-        start_time = time.monotonic()
-        while not results:
-            time.sleep(0.05)
-            # up to 1 min to report the results
-            dt = time.monotonic() - start_time
-            if dt > 60.0:
-                raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
+
+        # up to 1 min to report the results
+        for _ in support.sleeping_retry(60, "Timed out waiting for results"):
+            if results:
+                break
 
     results.sort()
     print(start_method, "->", results)
@@ -86,19 +85,17 @@ if __name__ != "__main__":
 import sys
 import time
 from multiprocessing import Pool, set_start_method
+from test import support
 
 start_method = sys.argv[1]
 set_start_method(start_method)
 results = []
 with Pool(5) as pool:
     pool.map_async(int, [1, 4, 9], callback=results.extend)
-    start_time = time.monotonic()
-    while not results:
-        time.sleep(0.05)
-        # up to 1 min to report the results
-        dt = time.monotonic() - start_time
-        if dt > 60.0:
-            raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
+    # up to 1 min to report the results
+    for _ in support.sleeping_retry(60, "Timed out waiting for results"):
+        if results:
+            break
 
 results.sort()
 print(start_method, "->", results)
index f6632896100a66261cbbd6c9593d2aa72683703f..a00c7e539400d99ff0b1855db051189990e2e5dd 100644 (file)
@@ -813,13 +813,14 @@ class ItimerTest(unittest.TestCase):
         signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
         signal.setitimer(self.itimer, 0.3, 0.2)
 
-        start_time = time.monotonic()
-        while time.monotonic() - start_time < 60.0:
+        for _ in support.busy_retry(60.0, error=False):
             # use up some virtual time by doing real work
             _ = pow(12345, 67890, 10000019)
             if signal.getitimer(self.itimer) == (0.0, 0.0):
-                break # sig_vtalrm handler stopped this itimer
-        else: # Issue 8424
+                # sig_vtalrm handler stopped this itimer
+                break
+        else:
+            # bpo-8424
             self.skipTest("timeout: likely cause: machine too slow or load too "
                           "high")
 
@@ -833,13 +834,14 @@ class ItimerTest(unittest.TestCase):
         signal.signal(signal.SIGPROF, self.sig_prof)
         signal.setitimer(self.itimer, 0.2, 0.2)
 
-        start_time = time.monotonic()
-        while time.monotonic() - start_time < 60.0:
+        for _ in support.busy_retry(60.0, error=False):
             # do some work
             _ = pow(12345, 67890, 10000019)
             if signal.getitimer(self.itimer) == (0.0, 0.0):
-                break # sig_prof handler stopped this itimer
-        else: # Issue 8424
+                # sig_prof handler stopped this itimer
+                break
+        else:
+            # bpo-8424
             self.skipTest("timeout: likely cause: machine too slow or load too "
                           "high")
 
@@ -1308,8 +1310,6 @@ class StressTest(unittest.TestCase):
         self.setsig(signal.SIGALRM, handler)  # for ITIMER_REAL
 
         expected_sigs = 0
-        deadline = time.monotonic() + support.SHORT_TIMEOUT
-
         while expected_sigs < N:
             # Hopefully the SIGALRM will be received somewhere during
             # initial processing of SIGUSR1.
@@ -1318,8 +1318,9 @@ class StressTest(unittest.TestCase):
 
             expected_sigs += 2
             # Wait for handlers to run to avoid signal coalescing
-            while len(sigs) < expected_sigs and time.monotonic() < deadline:
-                time.sleep(1e-5)
+            for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+                if len(sigs) >= expected_sigs:
+                    break
 
         # All ITIMER_REAL signals should have been delivered to the
         # Python handler
index 965c2728914b502d47363c49220e4ca67c03843b..1bfec237484124e950ad555643ddf567ceccf3e2 100644 (file)
@@ -2279,11 +2279,8 @@ class SimpleBackgroundTests(unittest.TestCase):
         # A simple IO loop. Call func(*args) depending on the error we get
         # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
         timeout = kwargs.get('timeout', support.SHORT_TIMEOUT)
-        deadline = time.monotonic() + timeout
         count = 0
-        while True:
-            if time.monotonic() > deadline:
-                self.fail("timeout")
+        for _ in support.busy_retry(timeout):
             errno = None
             count += 1
             try:
index 01ba88ce42c5f6e6bf0249fba2f1dd7a37a71c94..fec96bef4bbd2233bcda444eaa3d0e3d050c7849 100644 (file)
@@ -10,7 +10,6 @@ import sys
 import sysconfig
 import tempfile
 import textwrap
-import time
 import unittest
 import warnings
 
@@ -462,18 +461,12 @@ class TestSupport(unittest.TestCase):
             # child process: do nothing, just exit
             os._exit(0)
 
-        t0 = time.monotonic()
-        deadline = time.monotonic() + support.SHORT_TIMEOUT
-
         was_altered = support.environment_altered
         try:
             support.environment_altered = False
             stderr = io.StringIO()
 
-            while True:
-                if time.monotonic() > deadline:
-                    self.fail("timeout")
-
+            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
                 with support.swap_attr(support.print_warning, 'orig_stderr', stderr):
                     support.reap_children()
 
@@ -482,9 +475,6 @@ class TestSupport(unittest.TestCase):
                 if support.environment_altered:
                     break
 
-                # loop until the child process completed
-                time.sleep(0.100)
-
             msg = "Warning -- reap_children() reaped child process %s" % pid
             self.assertIn(msg, stderr.getvalue())
             self.assertTrue(support.environment_altered)
index 4ec7690ac19bbf3d6ce503426e1c938a1d517279..15d66ae825abf53f786ac3656db324b4d532590a 100644 (file)
@@ -4,7 +4,6 @@
 import os
 import subprocess
 import sys
-import time
 import unittest
 from test.fork_wait import ForkWait
 from test import support
@@ -20,14 +19,12 @@ class Wait3Test(ForkWait):
         # This many iterations can be required, since some previously run
         # tests (e.g. test_ctypes) could have spawned a lot of children
         # very quickly.
-        deadline = time.monotonic() + support.SHORT_TIMEOUT
-        while time.monotonic() <= deadline:
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
             # wait3() shouldn't hang, but some of the buildbots seem to hang
             # in the forking tests.  This is an attempt to fix the problem.
             spid, status, rusage = os.wait3(os.WNOHANG)
             if spid == cpid:
                 break
-            time.sleep(0.1)
 
         self.assertEqual(spid, cpid)
         self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
index 24f1aaec60c56beeffeb968fcca3f99c11f8dc3c..f66c0db1c20e6e628cd380ed377a4647a7c14d47 100644 (file)
@@ -2,7 +2,6 @@
 """
 
 import os
-import time
 import sys
 import unittest
 from test.fork_wait import ForkWait
@@ -22,14 +21,12 @@ class Wait4Test(ForkWait):
             # Issue #11185: wait4 is broken on AIX and will always return 0
             # with WNOHANG.
             option = 0
-        deadline = time.monotonic() + support.SHORT_TIMEOUT
-        while time.monotonic() <= deadline:
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
             # wait4() shouldn't hang, but some of the buildbots seem to hang
             # in the forking tests.  This is an attempt to fix the problem.
             spid, status, rusage = os.wait4(cpid, option)
             if spid == cpid:
                 break
-            time.sleep(0.1)
         self.assertEqual(spid, cpid)
         self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
         self.assertTrue(rusage)