The :mod:`test.support` module defines the following functions:
+.. function:: busy_retry(timeout, err_msg=None, /, *, error=True)
+
+ Run the loop body until ``break`` stops the loop.
+
+ After *timeout* seconds, raise an :exc:`AssertionError` if *error* is true,
+ or just stop the loop if *error* is false.
+
+ Example::
+
+ for _ in support.busy_retry(support.SHORT_TIMEOUT):
+ if check():
+ break
+
+ Example of error=False usage::
+
+ for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
+ if check():
+ break
+ else:
+ raise RuntimeError('my custom error')
+
+.. function:: sleeping_retry(timeout, err_msg=None, /, *, init_delay=0.010, max_delay=1.0, error=True)
+
+ Wait strategy that applies exponential backoff.
+
+ Run the loop body until ``break`` stops the loop. Sleep at each loop
+ iteration, but not at the first iteration. The sleep delay is doubled at
+ each iteration (up to *max_delay* seconds).
+
+ See :func:`busy_retry` documentation for the parameters usage.
+
+ Example raising an exception after SHORT_TIMEOUT seconds::
+
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if check():
+ break
+
+ Example of error=False usage::
+
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+ if check():
+ break
+ else:
+ raise RuntimeError('my custom error')
+
.. function:: is_resource_enabled(resource)
Return ``True`` if *resource* is enabled and available. The list of
p.terminate()
p.wait()
- deadline = time.monotonic() + support.LONG_TIMEOUT
- t = 0.1
- while time.monotonic() < deadline:
- time.sleep(t)
- t = min(t*2, 5)
+ err_msg = ("A SharedMemory segment was leaked after "
+ "a process was abruptly terminated")
+ for _ in support.sleeping_retry(support.LONG_TIMEOUT, err_msg):
try:
smm = shared_memory.SharedMemory(name, create=False)
except FileNotFoundError:
break
- else:
- raise AssertionError("A SharedMemory segment was leaked after"
- " a process was abruptly terminated.")
if os.name == 'posix':
# Without this line it was raising warnings like:
p.terminate()
p.wait()
- deadline = time.monotonic() + support.LONG_TIMEOUT
- while time.monotonic() < deadline:
- time.sleep(.5)
+ err_msg = (f"A {rtype} resource was leaked after a process was "
+ f"abruptly terminated")
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT,
+ err_msg):
try:
_resource_unlink(name2, rtype)
except OSError as e:
# EINVAL
self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
break
- else:
- raise AssertionError(
- f"A {rtype} resource was leaked after a process was "
- f"abruptly terminated.")
+
err = p.stderr.read().decode('utf-8')
p.stderr.close()
expected = ('resource_tracker: There appear to be 2 leaked {} '
# but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395).
join_process(self.proc)
+
start_time = time.monotonic()
- t = 0.01
- while len(multiprocessing.active_children()) > 1:
- time.sleep(t)
- t *= 2
- dt = time.monotonic() - start_time
- if dt >= 5.0:
- test.support.environment_altered = True
- support.print_warning(f"multiprocessing.Manager still has "
- f"{multiprocessing.active_children()} "
- f"active children after {dt} seconds")
+ for _ in support.sleeping_retry(5.0, error=False):
+ if len(multiprocessing.active_children()) <= 1:
break
+ else:
+ dt = time.monotonic() - start_time
+ support.environment_altered = True
+ support.print_warning(f"multiprocessing.Manager still has "
+ f"{multiprocessing.active_children()} "
+ f"active children after {dt:.1f} seconds")
def run_worker(self, worker, obj):
self.proc = multiprocessing.Process(target=worker, args=(obj, ))
# but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395)
start_time = time.monotonic()
- t = 0.01
- while len(multiprocessing.active_children()) > 1:
- time.sleep(t)
- t *= 2
- dt = time.monotonic() - start_time
- if dt >= 5.0:
- test.support.environment_altered = True
- support.print_warning(f"multiprocessing.Manager still has "
- f"{multiprocessing.active_children()} "
- f"active children after {dt} seconds")
+ for _ in support.sleeping_retry(5.0, error=False):
+ if len(multiprocessing.active_children()) <= 1:
break
+ else:
+ dt = time.monotonic() - start_time
+ support.environment_altered = True
+ support.print_warning(f"multiprocessing.Manager still has "
+ f"{multiprocessing.active_children()} "
+ f"active children after {dt:.1f} seconds")
gc.collect() # do garbage collection
if cls.manager._number_of_objects() != 0:
self.threads.append(thread)
# busy-loop to wait for threads
- deadline = time.monotonic() + support.SHORT_TIMEOUT
- while len(self.alive) < NUM_THREADS:
- time.sleep(0.1)
- if deadline < time.monotonic():
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+ if len(self.alive) >= NUM_THREADS:
break
a = sorted(self.alive.keys())
return unittest.skipUnless(ctypes, 'venv: pip requires ctypes')
+def busy_retry(timeout, err_msg=None, /, *, error=True):
+ """
+ Run the loop body until "break" stops the loop.
+
+ After *timeout* seconds, raise an AssertionError if *error* is true,
+ or just stop if *error is false.
+
+ Example:
+
+ for _ in support.busy_retry(support.SHORT_TIMEOUT):
+ if check():
+ break
+
+ Example of error=False usage:
+
+ for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
+ if check():
+ break
+ else:
+ raise RuntimeError('my custom error')
+
+ """
+ if timeout <= 0:
+ raise ValueError("timeout must be greater than zero")
+
+ start_time = time.monotonic()
+ deadline = start_time + timeout
+
+ while True:
+ yield
+
+ if time.monotonic() >= deadline:
+ break
+
+ if error:
+ dt = time.monotonic() - start_time
+ msg = f"timeout ({dt:.1f} seconds)"
+ if err_msg:
+ msg = f"{msg}: {err_msg}"
+ raise AssertionError(msg)
+
+
+def sleeping_retry(timeout, err_msg=None, /,
+ *, init_delay=0.010, max_delay=1.0, error=True):
+ """
+ Wait strategy that applies exponential backoff.
+
+ Run the loop body until "break" stops the loop. Sleep at each loop
+ iteration, but not at the first iteration. The sleep delay is doubled at
+ each iteration (up to *max_delay* seconds).
+
+ See busy_retry() documentation for the parameters usage.
+
+ Example raising an exception after SHORT_TIMEOUT seconds:
+
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if check():
+ break
+
+ Example of error=False usage:
+
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+ if check():
+ break
+ else:
+ raise RuntimeError('my custom error')
+ """
+
+ delay = init_delay
+ for _ in busy_retry(timeout, err_msg, error=error):
+ yield
+
+ time.sleep(delay)
+ delay = min(delay * 2, max_delay)
+
+
@contextlib.contextmanager
def adjust_int_max_str_digits(max_digits):
"""Temporarily change the integer string conversion length limit."""
# run subinterpreter eariler than the main thread in multiprocess.
if timeout is None:
timeout = support.SHORT_TIMEOUT
- start_time = time.monotonic()
- deadline = start_time + timeout
- while not interpreters.is_running(interp):
- if time.monotonic() > deadline:
- raise RuntimeError('interp is not running')
- time.sleep(0.010)
+ for _ in support.sleeping_retry(timeout, error=False):
+ if interpreters.is_running(interp):
+ break
+ else:
+ raise RuntimeError('interp is not running')
@contextlib.contextmanager
future.result()
# At some point, the executor should break
- t1 = time.monotonic()
- while not self.executor._broken:
- if time.monotonic() - t1 > 5:
- self.fail("executor not broken after 5 s.")
- time.sleep(0.01)
+ for _ in support.sleeping_retry(5, "executor not broken"):
+ if self.executor._broken:
+ break
+
# ... and from this point submit() is guaranteed to fail
with self.assertRaises(BrokenExecutor):
self.executor.submit(get_init_status)
import sys
import time
from multiprocessing import Pool, set_start_method
+from test import support
# We use this __main__ defined function in the map call below in order to
# check that multiprocessing in correctly running the unguarded
results = []
with Pool(5) as pool:
pool.map_async(f, [1, 2, 3], callback=results.extend)
- start_time = time.monotonic()
- while not results:
- time.sleep(0.05)
- # up to 1 min to report the results
- dt = time.monotonic() - start_time
- if dt > 60.0:
- raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
+
+ # up to 1 min to report the results
+ for _ in support.sleeping_retry(60, "Timed out waiting for results"):
+ if results:
+ break
results.sort()
print(start_method, "->", results)
import sys
import time
from multiprocessing import Pool, set_start_method
+from test import support
start_method = sys.argv[1]
set_start_method(start_method)
results = []
with Pool(5) as pool:
pool.map_async(int, [1, 4, 9], callback=results.extend)
- start_time = time.monotonic()
- while not results:
- time.sleep(0.05)
- # up to 1 min to report the results
- dt = time.monotonic() - start_time
- if dt > 60.0:
- raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
+ # up to 1 min to report the results
+ for _ in support.sleeping_retry(60, "Timed out waiting for results"):
+ if results:
+ break
results.sort()
print(start_method, "->", results)
signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
signal.setitimer(self.itimer, 0.3, 0.2)
- start_time = time.monotonic()
- while time.monotonic() - start_time < 60.0:
+ for _ in support.busy_retry(60.0, error=False):
# use up some virtual time by doing real work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
- break # sig_vtalrm handler stopped this itimer
- else: # Issue 8424
+ # sig_vtalrm handler stopped this itimer
+ break
+ else:
+ # bpo-8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)
- start_time = time.monotonic()
- while time.monotonic() - start_time < 60.0:
+ for _ in support.busy_retry(60.0, error=False):
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
- break # sig_prof handler stopped this itimer
- else: # Issue 8424
+ # sig_prof handler stopped this itimer
+ break
+ else:
+ # bpo-8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL
expected_sigs = 0
- deadline = time.monotonic() + support.SHORT_TIMEOUT
-
while expected_sigs < N:
# Hopefully the SIGALRM will be received somewhere during
# initial processing of SIGUSR1.
expected_sigs += 2
# Wait for handlers to run to avoid signal coalescing
- while len(sigs) < expected_sigs and time.monotonic() < deadline:
- time.sleep(1e-5)
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+ if len(sigs) >= expected_sigs:
+ break
# All ITIMER_REAL signals should have been delivered to the
# Python handler
# A simple IO loop. Call func(*args) depending on the error we get
# (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
timeout = kwargs.get('timeout', support.SHORT_TIMEOUT)
- deadline = time.monotonic() + timeout
count = 0
- while True:
- if time.monotonic() > deadline:
- self.fail("timeout")
+ for _ in support.busy_retry(timeout):
errno = None
count += 1
try:
import sysconfig
import tempfile
import textwrap
-import time
import unittest
import warnings
# child process: do nothing, just exit
os._exit(0)
- t0 = time.monotonic()
- deadline = time.monotonic() + support.SHORT_TIMEOUT
-
was_altered = support.environment_altered
try:
support.environment_altered = False
stderr = io.StringIO()
- while True:
- if time.monotonic() > deadline:
- self.fail("timeout")
-
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
with support.swap_attr(support.print_warning, 'orig_stderr', stderr):
support.reap_children()
if support.environment_altered:
break
- # loop until the child process completed
- time.sleep(0.100)
-
msg = "Warning -- reap_children() reaped child process %s" % pid
self.assertIn(msg, stderr.getvalue())
self.assertTrue(support.environment_altered)
import os
import subprocess
import sys
-import time
import unittest
from test.fork_wait import ForkWait
from test import support
# This many iterations can be required, since some previously run
# tests (e.g. test_ctypes) could have spawned a lot of children
# very quickly.
- deadline = time.monotonic() + support.SHORT_TIMEOUT
- while time.monotonic() <= deadline:
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
# wait3() shouldn't hang, but some of the buildbots seem to hang
# in the forking tests. This is an attempt to fix the problem.
spid, status, rusage = os.wait3(os.WNOHANG)
if spid == cpid:
break
- time.sleep(0.1)
self.assertEqual(spid, cpid)
self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
"""
import os
-import time
import sys
import unittest
from test.fork_wait import ForkWait
# Issue #11185: wait4 is broken on AIX and will always return 0
# with WNOHANG.
option = 0
- deadline = time.monotonic() + support.SHORT_TIMEOUT
- while time.monotonic() <= deadline:
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
# wait4() shouldn't hang, but some of the buildbots seem to hang
# in the forking tests. This is an attempt to fix the problem.
spid, status, rusage = os.wait4(cpid, option)
if spid == cpid:
break
- time.sleep(0.1)
self.assertEqual(spid, cpid)
self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
self.assertTrue(rusage)