'(instead of the Python stdlib test suite)')
group = parser.add_argument_group('Special runs')
- group.add_argument('-l', '--findleaks', action='store_true',
- help='if GC is available detect tests that leak memory')
+ group.add_argument('-l', '--findleaks', action='store_const', const=2,
+ default=1,
+ help='deprecated alias to --fail-env-changed')
group.add_argument('-L', '--runleaks', action='store_true',
help='run the leaks(1) command just before exit.' +
more_details)
# Defaults
ns = argparse.Namespace(testdir=None, verbose=0, quiet=False,
exclude=False, single=False, randomize=False, fromfile=None,
- findleaks=False, use_resources=None, trace=False, coverdir='coverage',
+ findleaks=1, use_resources=None, trace=False, coverdir='coverage',
runleaks=False, huntrleaks=False, verbose2=False, print_slow=False,
random_seed=None, use_mp=None, verbose3=False, forever=False,
header=False, failfast=False, match_tests=None, pgo=False)
parser.error("unrecognized arguments: %s" % arg)
sys.exit(1)
+ if ns.findleaks > 1:
+ # --findleaks implies --fail-env-changed
+ ns.fail_env_changed = True
if ns.single and ns.fromfile:
parser.error("-s and -f don't go together!")
if ns.use_mp is not None and ns.trace:
parser.error("-T and -j don't go together!")
- if ns.use_mp is not None and ns.findleaks:
- parser.error("-l and -j don't go together!")
if ns.failfast and not (ns.verbose or ns.verbose3):
parser.error("-G/--failfast needs either -v or -W")
if ns.pgo and (ns.verbose or ns.verbose2 or ns.verbose3):
from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import removepy, count, format_duration, printlist
from test import support
-try:
- import gc
-except ImportError:
- gc = None
# When tests are run from the Python build directory, it is best practice
self.skipped = []
self.resource_denieds = []
self.environment_changed = []
- self.rerun = []
self.run_no_tests = []
+ self.rerun = []
self.first_result = None
self.interrupted = False
# used by --coverage, trace.Trace instance
self.tracer = None
- # used by --findleaks, store for gc.garbage
- self.found_garbage = []
-
# used to display the progress bar "[ 3/100]"
self.start_time = time.monotonic()
self.test_count = ''
# used by --junit-xml
self.testsuite_xml = None
- def accumulate_result(self, test, result):
- ok, test_time, xml_data = result
- if ok not in (CHILD_ERROR, INTERRUPTED):
- self.test_times.append((test_time, test))
+ self.win_load_tracker = None
+
+ def get_executed(self):
+ return (set(self.good) | set(self.bad) | set(self.skipped)
+ | set(self.resource_denieds) | set(self.environment_changed)
+ | set(self.run_no_tests))
+
+ def accumulate_result(self, result, rerun=False):
+ test_name = result.test_name
+ ok = result.result
+
+ if ok not in (CHILD_ERROR, INTERRUPTED) and not rerun:
+ self.test_times.append((result.test_time, test_name))
+
if ok == PASSED:
- self.good.append(test)
+ self.good.append(test_name)
elif ok in (FAILED, CHILD_ERROR):
- self.bad.append(test)
+ if not rerun:
+ self.bad.append(test_name)
elif ok == ENV_CHANGED:
- self.environment_changed.append(test)
+ self.environment_changed.append(test_name)
elif ok == SKIPPED:
- self.skipped.append(test)
+ self.skipped.append(test_name)
elif ok == RESOURCE_DENIED:
- self.skipped.append(test)
- self.resource_denieds.append(test)
+ self.skipped.append(test_name)
+ self.resource_denieds.append(test_name)
elif ok == TEST_DID_NOT_RUN:
- self.run_no_tests.append(test)
- elif ok != INTERRUPTED:
+ self.run_no_tests.append(test_name)
+ elif ok == INTERRUPTED:
+ self.interrupted = True
+ else:
raise ValueError("invalid test result: %r" % ok)
+ if rerun and ok not in {FAILED, CHILD_ERROR, INTERRUPTED}:
+ self.bad.remove(test_name)
+
+ xml_data = result.xml_data
if xml_data:
import xml.etree.ElementTree as ET
for e in xml_data:
print(xml_data, file=sys.__stderr__)
raise
- def display_progress(self, test_index, test):
+ def display_progress(self, test_index, text):
if self.ns.quiet:
return
fails = len(self.bad) + len(self.environment_changed)
if fails and not self.ns.pgo:
line = f"{line}/{fails}"
- line = f"[{line}] {test}"
+ line = f"[{line}] {text}"
# add the system load prefix: "load avg: 1.80 "
- if hasattr(os, 'getloadavg'):
- load_avg_1min = os.getloadavg()[0]
- line = f"load avg: {load_avg_1min:.2f} {line}"
+ load_avg = self.getloadavg()
+ if load_avg is not None:
+ line = f"load avg: {load_avg:.2f} {line}"
# add the timestamp prefix: "0:01:05 "
test_time = time.monotonic() - self.start_time
"faulthandler.dump_traceback_later", file=sys.stderr)
ns.timeout = None
- if ns.threshold is not None and gc is None:
- print('No GC available, ignore --threshold.', file=sys.stderr)
- ns.threshold = None
-
- if ns.findleaks:
- if gc is not None:
- # Uncomment the line below to report garbage that is not
- # freeable by reference counting alone. By default only
- # garbage that is not collectable by the GC is reported.
- pass
- #gc.set_debug(gc.DEBUG_SAVEALL)
- else:
- print('No GC available, disabling --findleaks',
- file=sys.stderr)
- ns.findleaks = False
-
if ns.xmlpath:
support.junit_xml_list = self.testsuite_xml = []
support.verbose = False
support.set_match_tests(self.ns.match_tests)
- for test in self.selected:
- abstest = get_abs_module(self.ns, test)
+ for test_name in self.selected:
+ abstest = get_abs_module(self.ns, test_name)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
self._list_cases(suite)
except unittest.SkipTest:
- self.skipped.append(test)
+ self.skipped.append(test_name)
if self.skipped:
print(file=sys.stderr)
print()
print("Re-running failed tests in verbose mode")
self.rerun = self.bad[:]
- for test in self.rerun:
- print("Re-running test %r in verbose mode" % test, flush=True)
- try:
- self.ns.verbose = True
- ok = runtest(self.ns, test)
- except KeyboardInterrupt:
- self.interrupted = True
- # print a newline separate from the ^C
- print()
+ for test_name in self.rerun:
+ print(f"Re-running {test_name} in verbose mode", flush=True)
+ self.ns.verbose = True
+ result = runtest(self.ns, test_name)
+
+ self.accumulate_result(result, rerun=True)
+
+ if result.result == INTERRUPTED:
break
- else:
- if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
- self.bad.remove(test)
- else:
- if self.bad:
- print(count(len(self.bad), 'test'), "failed again:")
- printlist(self.bad)
+
+ if self.bad:
+ print(count(len(self.bad), 'test'), "failed again:")
+ printlist(self.bad)
self.display_result()
print("== Tests result: %s ==" % self.get_tests_result())
if self.interrupted:
- print()
- # print a newline after ^C
print("Test suite interrupted by signal SIGINT.")
- executed = set(self.good) | set(self.bad) | set(self.skipped)
- omitted = set(self.selected) - executed
+
+ omitted = set(self.selected) - self.get_executed()
+ if omitted:
+ print()
print(count(len(omitted), "test"), "omitted:")
printlist(omitted)
self.test_times.sort(reverse=True)
print()
print("10 slowest tests:")
- for time, test in self.test_times[:10]:
- print("- %s: %s" % (test, format_duration(time)))
+ for test_time, test in self.test_times[:10]:
+ print("- %s: %s" % (test, format_duration(test_time)))
if self.bad:
print()
print("Run tests sequentially")
previous_test = None
- for test_index, test in enumerate(self.tests, 1):
+ for test_index, test_name in enumerate(self.tests, 1):
start_time = time.monotonic()
- text = test
+ text = test_name
if previous_test:
text = '%s -- %s' % (text, previous_test)
self.display_progress(test_index, text)
if self.tracer:
# If we're tracing code coverage, then we don't exit with status
# if on a false return value from main.
- cmd = ('result = runtest(self.ns, test); '
- 'self.accumulate_result(test, result)')
+ cmd = ('result = runtest(self.ns, test_name); '
+ 'self.accumulate_result(result)')
ns = dict(locals())
self.tracer.runctx(cmd, globals=globals(), locals=ns)
result = ns['result']
else:
- try:
- result = runtest(self.ns, test)
- except KeyboardInterrupt:
- self.interrupted = True
- self.accumulate_result(test, (INTERRUPTED, None, None))
- break
- else:
- self.accumulate_result(test, result)
-
- previous_test = format_test_result(test, result[0])
+ result = runtest(self.ns, test_name)
+ self.accumulate_result(result)
+
+ if result.result == INTERRUPTED:
+ break
+
+ previous_test = format_test_result(result)
test_time = time.monotonic() - start_time
if test_time >= PROGRESS_MIN_TIME:
previous_test = "%s in %s" % (previous_test, format_duration(test_time))
# be quiet: say nothing if the test passed shortly
previous_test = None
- if self.ns.findleaks:
- gc.collect()
- if gc.garbage:
- print("Warning: test created", len(gc.garbage), end=' ')
- print("uncollectable object(s).")
- # move the uncollectable objects somewhere so we don't see
- # them again
- self.found_garbage.extend(gc.garbage)
- del gc.garbage[:]
-
# Unload the newly imported modules (best effort finalization)
for module in sys.modules.keys():
if module not in save_modules and module.startswith("test."):
def _test_forever(self, tests):
while True:
- for test in tests:
- yield test
+ for test_name in tests:
+ yield test_name
if self.bad:
return
if self.ns.fail_env_changed and self.environment_changed:
self.run_tests_sequential()
def finalize(self):
+ if self.win_load_tracker is not None:
+ self.win_load_tracker.close()
+ self.win_load_tracker = None
+
if self.next_single_filename:
if self.next_single_test:
with open(self.next_single_filename, 'w') as fp:
with support.temp_cwd(test_cwd, quiet=True):
self._main(tests, kwargs)
+ def getloadavg(self):
+ if self.win_load_tracker is not None:
+ return self.win_load_tracker.getloadavg()
+
+ if hasattr(os, 'getloadavg'):
+ return os.getloadavg()[0]
+
+ return None
+
def _main(self, tests, kwargs):
if self.ns.huntrleaks:
warmup, repetitions, _ = self.ns.huntrleaks
self.list_cases()
sys.exit(0)
+ # If we're on windows and this is the parent runner (not a worker),
+ # track the load average.
+ if sys.platform == 'win32' and (self.ns.worker_args is None):
+ from test.libregrtest.win_utils import WindowsLoadTracker
+
+ try:
+ self.win_load_tracker = WindowsLoadTracker()
+ except FileNotFoundError as error:
+ # Windows IoT Core and Windows Nano Server do not provide
+ # typeperf.exe for x64, x86 or ARM
+ print(f'Failed to create WindowsLoadTracker: {error}')
+
self.run_tests()
self.display_result()
-import errno
import os
import re
import sys
cls._abc_negative_cache, cls._abc_negative_cache_version)
-def dash_R(ns, the_module, test_name, test_func):
+def dash_R(ns, test_name, test_func):
"""Run a test multiple times, looking for reference leaks.
Returns:
+import collections
import faulthandler
+import functools
+import gc
import importlib
import io
import os
import time
import traceback
import unittest
+
from test import support
from test.libregrtest.refleak import dash_R, clear_caches
from test.libregrtest.save_env import saved_test_environment
+from test.libregrtest.utils import print_warning
# Test result constants.
NOTTESTS = set()
-def format_test_result(test_name, result):
- fmt = _FORMAT_TEST_RESULT.get(result, "%s")
- return fmt % test_name
+# used by --findleaks, store for gc.garbage
+FOUND_GARBAGE = []
+
+
+def format_test_result(result):
+ fmt = _FORMAT_TEST_RESULT.get(result.result, "%s")
+ return fmt % result.test_name
+
+
+def findtestdir(path=None):
+ return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
return stdtests + sorted(tests)
-def get_abs_module(ns, test):
- if test.startswith('test.') or ns.testdir:
- return test
+def get_abs_module(ns, test_name):
+ if test_name.startswith('test.') or ns.testdir:
+ return test_name
else:
- # Always import it from the test package
- return 'test.' + test
-
-
-def runtest(ns, test):
- """Run a single test.
+ # Import it from the test package
+ return 'test.' + test_name
- ns -- regrtest namespace of options
- test -- the name of the test
- Returns the tuple (result, test_time, xml_data), where result is one
- of the constants:
+TestResult = collections.namedtuple('TestResult',
+ 'test_name result test_time xml_data')
- INTERRUPTED KeyboardInterrupt when run under -j
- RESOURCE_DENIED test skipped because resource denied
- SKIPPED test skipped for some other reason
- ENV_CHANGED test failed because it changed the execution environment
- FAILED test failed
- PASSED test passed
- EMPTY_TEST_SUITE test ran no subtests.
-
- If ns.xmlpath is not None, xml_data is a list containing each
- generated testsuite element.
- """
+def _runtest(ns, test_name):
+ # Handle faulthandler timeout, capture stdout+stderr, XML serialization
+ # and measure time.
output_on_failure = ns.verbose3
use_timeout = (ns.timeout is not None)
if use_timeout:
faulthandler.dump_traceback_later(ns.timeout, exit=True)
+
+ start_time = time.perf_counter()
try:
support.set_match_tests(ns.match_tests)
- # reset the environment_altered flag to detect if a test altered
- # the environment
- support.environment_altered = False
support.junit_xml_list = xml_list = [] if ns.xmlpath else None
if ns.failfast:
support.failfast = True
+
if output_on_failure:
support.verbose = True
try:
sys.stdout = stream
sys.stderr = stream
- result = runtest_inner(ns, test, display_failure=False)
- if result[0] != PASSED:
+ result = _runtest_inner(ns, test_name,
+ display_failure=False)
+ if result != PASSED:
output = stream.getvalue()
orig_stderr.write(output)
orig_stderr.flush()
sys.stdout = orig_stdout
sys.stderr = orig_stderr
else:
- support.verbose = ns.verbose # Tell tests to be moderately quiet
- result = runtest_inner(ns, test, display_failure=not ns.verbose)
+ # Tell tests to be moderately quiet
+ support.verbose = ns.verbose
+
+ result = _runtest_inner(ns, test_name,
+ display_failure=not ns.verbose)
if xml_list:
import xml.etree.ElementTree as ET
xml_data = [ET.tostring(x).decode('us-ascii') for x in xml_list]
else:
xml_data = None
- return result + (xml_data,)
+
+ test_time = time.perf_counter() - start_time
+
+ return TestResult(test_name, result, test_time, xml_data)
finally:
if use_timeout:
faulthandler.cancel_dump_traceback_later()
- cleanup_test_droppings(test, ns.verbose)
support.junit_xml_list = None
-def post_test_cleanup():
+def runtest(ns, test_name):
+ """Run a single test.
+
+ ns -- regrtest namespace of options
+ test_name -- the name of the test
+
+ Returns the tuple (result, test_time, xml_data), where result is one
+ of the constants:
+
+ INTERRUPTED KeyboardInterrupt
+ RESOURCE_DENIED test skipped because resource denied
+ SKIPPED test skipped for some other reason
+ ENV_CHANGED test failed because it changed the execution environment
+ FAILED test failed
+ PASSED test passed
+ EMPTY_TEST_SUITE test ran no subtests.
+
+ If ns.xmlpath is not None, xml_data is a list containing each
+ generated testsuite element.
+ """
+ try:
+ return _runtest(ns, test_name)
+ except:
+ if not ns.pgo:
+ msg = traceback.format_exc()
+ print(f"test {test_name} crashed -- {msg}",
+ file=sys.stderr, flush=True)
+ return TestResult(test_name, FAILED, 0.0, None)
+
+
+def _test_module(the_module):
+ loader = unittest.TestLoader()
+ tests = loader.loadTestsFromModule(the_module)
+ for error in loader.errors:
+ print(error, file=sys.stderr)
+ if loader.errors:
+ raise Exception("errors while loading tests")
+ support.run_unittest(tests)
+
+
+def _runtest_inner2(ns, test_name):
+ # Load the test function, run the test function, handle huntrleaks
+ # and findleaks to detect leaks
+
+ abstest = get_abs_module(ns, test_name)
+
+ # remove the module from sys.module to reload it if it was already imported
+ support.unload(abstest)
+
+ the_module = importlib.import_module(abstest)
+
+ # If the test has a test_main, that will run the appropriate
+ # tests. If not, use normal unittest test loading.
+ test_runner = getattr(the_module, "test_main", None)
+ if test_runner is None:
+ test_runner = functools.partial(_test_module, the_module)
+
+ try:
+ if ns.huntrleaks:
+ # Return True if the test leaked references
+ refleak = dash_R(ns, test_name, test_runner)
+ else:
+ test_runner()
+ refleak = False
+ finally:
+ cleanup_test_droppings(test_name, ns.verbose)
+
+ support.gc_collect()
+
+ if gc.garbage:
+ support.environment_altered = True
+ print_warning(f"{test_name} created {len(gc.garbage)} "
+ f"uncollectable object(s).")
+
+ # move the uncollectable objects somewhere,
+ # so we don't see them again
+ FOUND_GARBAGE.extend(gc.garbage)
+ gc.garbage.clear()
+
support.reap_children()
+ return refleak
+
+
+def _runtest_inner(ns, test_name, display_failure=True):
+ # Detect environment changes, handle exceptions.
-def runtest_inner(ns, test, display_failure=True):
- support.unload(test)
+ # Reset the environment_altered flag to detect if a test altered
+ # the environment
+ support.environment_altered = False
+
+ if ns.pgo:
+ display_failure = False
- test_time = 0.0
- refleak = False # True if the test leaked references.
try:
- abstest = get_abs_module(ns, test)
clear_caches()
- with saved_test_environment(test, ns.verbose, ns.quiet, pgo=ns.pgo) as environment:
- start_time = time.perf_counter()
- the_module = importlib.import_module(abstest)
- # If the test has a test_main, that will run the appropriate
- # tests. If not, use normal unittest test loading.
- test_runner = getattr(the_module, "test_main", None)
- if test_runner is None:
- def test_runner():
- loader = unittest.TestLoader()
- tests = loader.loadTestsFromModule(the_module)
- for error in loader.errors:
- print(error, file=sys.stderr)
- if loader.errors:
- raise Exception("errors while loading tests")
- support.run_unittest(tests)
- if ns.huntrleaks:
- refleak = dash_R(ns, the_module, test, test_runner)
- else:
- test_runner()
- test_time = time.perf_counter() - start_time
- post_test_cleanup()
+
+ with saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) as environment:
+ refleak = _runtest_inner2(ns, test_name)
except support.ResourceDenied as msg:
if not ns.quiet and not ns.pgo:
- print(test, "skipped --", msg, flush=True)
- return RESOURCE_DENIED, test_time
+ print(f"{test_name} skipped -- {msg}", flush=True)
+ return RESOURCE_DENIED
except unittest.SkipTest as msg:
if not ns.quiet and not ns.pgo:
- print(test, "skipped --", msg, flush=True)
- return SKIPPED, test_time
- except KeyboardInterrupt:
- raise
- except support.TestFailed as msg:
- if not ns.pgo:
- if display_failure:
- print("test", test, "failed --", msg, file=sys.stderr,
- flush=True)
- else:
- print("test", test, "failed", file=sys.stderr, flush=True)
- return FAILED, test_time
+ print(f"{test_name} skipped -- {msg}", flush=True)
+ return SKIPPED
+ except support.TestFailed as exc:
+ msg = f"test {test_name} failed"
+ if display_failure:
+ msg = f"{msg} -- {exc}"
+ print(msg, file=sys.stderr, flush=True)
+ return FAILED
except support.TestDidNotRun:
- return TEST_DID_NOT_RUN, test_time
+ return TEST_DID_NOT_RUN
+ except KeyboardInterrupt:
+ print()
+ return INTERRUPTED
except:
- msg = traceback.format_exc()
if not ns.pgo:
- print("test", test, "crashed --", msg, file=sys.stderr,
- flush=True)
- return FAILED, test_time
- else:
- if refleak:
- return FAILED, test_time
- if environment.changed:
- return ENV_CHANGED, test_time
- return PASSED, test_time
+ msg = traceback.format_exc()
+ print(f"test {test_name} crashed -- {msg}",
+ file=sys.stderr, flush=True)
+ return FAILED
+ if refleak:
+ return FAILED
+ if environment.changed:
+ return ENV_CHANGED
+ return PASSED
-def cleanup_test_droppings(testname, verbose):
- import shutil
- import stat
- import gc
+def cleanup_test_droppings(test_name, verbose):
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
# triggered during the following test run, and possibly produce failures.
- gc.collect()
+ support.gc_collect()
# Try to clean up junk commonly left behind. While tests shouldn't leave
# any files or directories behind, when a test fails that can be tedious
continue
if os.path.isdir(name):
+ import shutil
kind, nuker = "directory", shutil.rmtree
elif os.path.isfile(name):
kind, nuker = "file", os.unlink
else:
- raise SystemError("os.path says %r exists but is neither "
- "directory nor file" % name)
+ raise RuntimeError(f"os.path says {name!r} exists but is neither "
+ f"directory nor file")
if verbose:
- print("%r left behind %s %r" % (testname, kind, name))
+ print_warning("%r left behind %s %r" % (test_name, kind, name))
+ support.environment_altered = True
+
try:
- # if we have chmod, fix possible permissions problems
- # that might prevent cleanup
- if (hasattr(os, 'chmod')):
- os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
+ import stat
+ # fix possible permissions problems that might prevent cleanup
+ os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
nuker(name)
- except Exception as msg:
- print(("%r left behind %s %r and it couldn't be "
- "removed: %s" % (testname, kind, name, msg)), file=sys.stderr)
-
-
-def findtestdir(path=None):
- return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
+ except Exception as exc:
+ print_warning(f"{test_name} left behind {kind} {name!r} "
+ f"and it couldn't be removed: {exc}")
+import collections
import faulthandler
import json
import os
import queue
+import subprocess
import sys
import threading
import time
from test.libregrtest.runtest import (
runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
- format_test_result)
+ format_test_result, TestResult)
from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import format_duration
# Display the running tests if nothing happened last N seconds
PROGRESS_UPDATE = 30.0 # seconds
-# If interrupted, display the wait progress every N seconds
-WAIT_PROGRESS = 2.0 # seconds
+def must_stop(result):
+ return result.result in (INTERRUPTED, CHILD_ERROR)
-def run_test_in_subprocess(testname, ns):
- """Run the given test in a subprocess with --worker-args.
-
- ns is the option Namespace parsed from command-line arguments. regrtest
- is invoked in a subprocess with the --worker-args argument; when the
- subprocess exits, its return code, stdout and stderr are returned as a
- 3-tuple.
- """
- from subprocess import Popen, PIPE
+def run_test_in_subprocess(testname, ns):
ns_dict = vars(ns)
worker_args = (ns_dict, testname)
worker_args = json.dumps(worker_args)
# Running the child from the same working directory as regrtest's original
# invocation ensures that TEMPDIR for the child is the same when
# sysconfig.is_python_build() is true. See issue 15300.
- popen = Popen(cmd,
- stdout=PIPE, stderr=PIPE,
- universal_newlines=True,
- close_fds=(os.name != 'nt'),
- cwd=support.SAVEDCWD)
- with popen:
- stdout, stderr = popen.communicate()
- retcode = popen.wait()
- return retcode, stdout, stderr
+ return subprocess.Popen(cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=True,
+ close_fds=(os.name != 'nt'),
+ cwd=support.SAVEDCWD)
def run_tests_worker(worker_args):
setup_tests(ns)
- try:
- result = runtest(ns, testname)
- except KeyboardInterrupt:
- result = INTERRUPTED, '', None
- except BaseException as e:
- traceback.print_exc()
- result = CHILD_ERROR, str(e)
-
+ result = runtest(ns, testname)
print() # Force a newline (just in case)
print(json.dumps(result), flush=True)
sys.exit(0)
"""A thread-safe iterator over tests for multiprocess mode."""
def __init__(self, tests):
- self.interrupted = False
self.lock = threading.Lock()
self.tests = tests
def __next__(self):
with self.lock:
- if self.interrupted:
- raise StopIteration('tests interrupted')
return next(self.tests)
+MultiprocessResult = collections.namedtuple('MultiprocessResult',
+ 'result stdout stderr error_msg')
+
class MultiprocessThread(threading.Thread):
def __init__(self, pending, output, ns):
super().__init__()
self.pending = pending
self.output = output
self.ns = ns
- self.current_test = None
+ self.current_test_name = None
self.start_time = None
+ self._popen = None
- def _runtest(self):
- try:
- test = next(self.pending)
- except StopIteration:
- self.output.put((None, None, None, None))
- return True
+ def kill(self):
+ if not self.is_alive():
+ return
+ if self._popen is not None:
+ self._popen.kill()
+ def _runtest(self, test_name):
try:
self.start_time = time.monotonic()
- self.current_test = test
-
- retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
+ self.current_test_name = test_name
+
+ popen = run_test_in_subprocess(test_name, self.ns)
+ self._popen = popen
+ with popen:
+ try:
+ stdout, stderr = popen.communicate()
+ except:
+ popen.kill()
+ popen.wait()
+ raise
+
+ retcode = popen.wait()
finally:
- self.current_test = None
+ self.current_test_name = None
+ self._popen = None
- if retcode != 0:
- result = (CHILD_ERROR, "Exit code %s" % retcode, None)
- self.output.put((test, stdout.rstrip(), stderr.rstrip(),
- result))
- return False
-
- stdout, _, result = stdout.strip().rpartition("\n")
- if not result:
- self.output.put((None, None, None, None))
- return True
+ stdout = stdout.strip()
+ stderr = stderr.rstrip()
- result = json.loads(result)
- assert len(result) == 3, f"Invalid result tuple: {result!r}"
- self.output.put((test, stdout.rstrip(), stderr.rstrip(),
- result))
- return False
+ err_msg = None
+ if retcode != 0:
+ err_msg = "Exit code %s" % retcode
+ else:
+ stdout, _, result = stdout.rpartition("\n")
+ stdout = stdout.rstrip()
+ if not result:
+ err_msg = "Failed to parse worker stdout"
+ else:
+ try:
+ # deserialize run_tests_worker() output
+ result = json.loads(result)
+ result = TestResult(*result)
+ except Exception as exc:
+ err_msg = "Failed to parse worker JSON: %s" % exc
+
+ if err_msg is not None:
+ test_time = time.monotonic() - self.start_time
+ result = TestResult(test_name, CHILD_ERROR, test_time, None)
+
+ return MultiprocessResult(result, stdout, stderr, err_msg)
def run(self):
- try:
- stop = False
- while not stop:
- stop = self._runtest()
- except BaseException:
- self.output.put((None, None, None, None))
- raise
+ while True:
+ try:
+ try:
+ test_name = next(self.pending)
+ except StopIteration:
+ break
+ mp_result = self._runtest(test_name)
+ self.output.put((False, mp_result))
-def run_tests_multiprocess(regrtest):
- output = queue.Queue()
- pending = MultiprocessIterator(regrtest.tests)
- test_timeout = regrtest.ns.timeout
- use_timeout = (test_timeout is not None)
-
- workers = [MultiprocessThread(pending, output, regrtest.ns)
- for i in range(regrtest.ns.use_mp)]
- print("Run tests in parallel using %s child processes"
- % len(workers))
+ if must_stop(mp_result.result):
+ break
+ except BaseException:
+ self.output.put((True, traceback.format_exc()))
+ break
+
+
+def get_running(workers):
+ running = []
for worker in workers:
- worker.start()
-
- def get_running(workers):
- running = []
- for worker in workers:
- current_test = worker.current_test
- if not current_test:
- continue
- dt = time.monotonic() - worker.start_time
- if dt >= PROGRESS_MIN_TIME:
- text = '%s (%s)' % (current_test, format_duration(dt))
- running.append(text)
- return running
-
- finished = 0
- test_index = 1
- get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
- try:
- while finished < regrtest.ns.use_mp:
- if use_timeout:
- faulthandler.dump_traceback_later(test_timeout, exit=True)
+ current_test_name = worker.current_test_name
+ if not current_test_name:
+ continue
+ dt = time.monotonic() - worker.start_time
+ if dt >= PROGRESS_MIN_TIME:
+ text = '%s (%s)' % (current_test_name, format_duration(dt))
+ running.append(text)
+ return running
+
+
+class MultiprocessRunner:
+ def __init__(self, regrtest):
+ self.regrtest = regrtest
+ self.ns = regrtest.ns
+ self.output = queue.Queue()
+ self.pending = MultiprocessIterator(self.regrtest.tests)
+ if self.ns.timeout is not None:
+ self.test_timeout = self.ns.timeout * 1.5
+ else:
+ self.test_timeout = None
+ self.workers = None
+
+ def start_workers(self):
+ self.workers = [MultiprocessThread(self.pending, self.output, self.ns)
+ for _ in range(self.ns.use_mp)]
+ print("Run tests in parallel using %s child processes"
+ % len(self.workers))
+ for worker in self.workers:
+ worker.start()
+
+ def wait_workers(self):
+ for worker in self.workers:
+ worker.kill()
+ for worker in self.workers:
+ worker.join()
+
+ def _get_result(self):
+ if not any(worker.is_alive() for worker in self.workers):
+ # all worker threads are done: consume pending results
+ try:
+ return self.output.get(timeout=0)
+ except queue.Empty:
+ return None
+
+ while True:
+ if self.test_timeout is not None:
+ faulthandler.dump_traceback_later(self.test_timeout, exit=True)
+ # wait for a thread
+ timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
try:
- item = output.get(timeout=get_timeout)
+ return self.output.get(timeout=timeout)
except queue.Empty:
- running = get_running(workers)
- if running and not regrtest.ns.pgo:
- print('running: %s' % ', '.join(running), flush=True)
- continue
-
- test, stdout, stderr, result = item
- if test is None:
- finished += 1
- continue
- regrtest.accumulate_result(test, result)
-
- # Display progress
- ok, test_time, xml_data = result
- text = format_test_result(test, ok)
- if (ok not in (CHILD_ERROR, INTERRUPTED)
- and test_time >= PROGRESS_MIN_TIME
- and not regrtest.ns.pgo):
- text += ' (%s)' % format_duration(test_time)
- elif ok == CHILD_ERROR:
- text = '%s (%s)' % (text, test_time)
- running = get_running(workers)
- if running and not regrtest.ns.pgo:
- text += ' -- running: %s' % ', '.join(running)
- regrtest.display_progress(test_index, text)
-
- # Copy stdout and stderr from the child process
- if stdout:
- print(stdout, flush=True)
- if stderr and not regrtest.ns.pgo:
- print(stderr, file=sys.stderr, flush=True)
-
- if result[0] == INTERRUPTED:
- raise KeyboardInterrupt
- test_index += 1
- except KeyboardInterrupt:
- regrtest.interrupted = True
- pending.interrupted = True
- print()
- finally:
- if use_timeout:
- faulthandler.cancel_dump_traceback_later()
-
- # If tests are interrupted, wait until tests complete
- wait_start = time.monotonic()
- while True:
- running = [worker.current_test for worker in workers]
- running = list(filter(bool, running))
- if not running:
- break
-
- dt = time.monotonic() - wait_start
- line = "Waiting for %s (%s tests)" % (', '.join(running), len(running))
- if dt >= WAIT_PROGRESS:
- line = "%s since %.0f sec" % (line, dt)
- print(line, flush=True)
- for worker in workers:
- worker.join(WAIT_PROGRESS)
+ pass
+
+ # display progress
+ running = get_running(self.workers)
+ if running and not self.ns.pgo:
+ print('running: %s' % ', '.join(running), flush=True)
+
+ def display_result(self, mp_result):
+ result = mp_result.result
+
+ text = format_test_result(result)
+ if mp_result.error_msg is not None:
+ # CHILD_ERROR
+ text += ' (%s)' % mp_result.error_msg
+ elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo):
+ text += ' (%s)' % format_duration(result.test_time)
+ running = get_running(self.workers)
+ if running and not self.ns.pgo:
+ text += ' -- running: %s' % ', '.join(running)
+ self.regrtest.display_progress(self.test_index, text)
+
+ def _process_result(self, item):
+ if item[0]:
+ # Thread got an exception
+ format_exc = item[1]
+ print(f"regrtest worker thread failed: {format_exc}",
+ file=sys.stderr, flush=True)
+ return True
+
+ self.test_index += 1
+ mp_result = item[1]
+ self.regrtest.accumulate_result(mp_result.result)
+ self.display_result(mp_result)
+
+ if mp_result.stdout:
+ print(mp_result.stdout, flush=True)
+ if mp_result.stderr and not self.ns.pgo:
+ print(mp_result.stderr, file=sys.stderr, flush=True)
+
+ if must_stop(mp_result.result):
+ return True
+
+ return False
+
+ def run_tests(self):
+ self.start_workers()
+
+ self.test_index = 0
+ try:
+ while True:
+ item = self._get_result()
+ if item is None:
+ break
+
+ stop = self._process_result(item)
+ if stop:
+ break
+ except KeyboardInterrupt:
+ print()
+ self.regrtest.interrupted = True
+ finally:
+ if self.test_timeout is not None:
+ faulthandler.cancel_dump_traceback_later()
+
+ self.wait_workers()
+
+
+def run_tests_multiprocess(regrtest):
+ MultiprocessRunner(regrtest).run_tests()
import threading
import warnings
from test import support
+from test.libregrtest.utils import print_warning
try:
import _multiprocessing, multiprocessing.process
except ImportError:
self.changed = True
restore(original)
if not self.quiet and not self.pgo:
- print(f"Warning -- {name} was modified by {self.testname}",
- file=sys.stderr, flush=True)
+ print_warning(f"{name} was modified by {self.testname}")
print(f" Before: {original}\n After: {current} ",
file=sys.stderr, flush=True)
return False
-import os.path
import math
+import os.path
+import sys
import textwrap
print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
initial_indent=blanks, subsequent_indent=blanks),
file=file)
+
+
+def print_warning(msg):
+ print(f"Warning -- {msg}", file=sys.stderr, flush=True)
--- /dev/null
+import _winapi
+import msvcrt
+import os
+import subprocess
+import uuid
+from test import support
+
+
+# Max size of asynchronous reads
+BUFSIZE = 8192
+# Exponential damping factor (see below)
+LOAD_FACTOR_1 = 0.9200444146293232478931553241
+# Seconds per measurement
+SAMPLING_INTERVAL = 5
+COUNTER_NAME = r'\System\Processor Queue Length'
+
+
+class WindowsLoadTracker():
+ """
+ This class asynchronously interacts with the `typeperf` command to read
+ the system load on Windows. Mulitprocessing and threads can't be used
+ here because they interfere with the test suite's cases for those
+ modules.
+ """
+
+ def __init__(self):
+ self.load = 0.0
+ self.start()
+
+ def start(self):
+ # Create a named pipe which allows for asynchronous IO in Windows
+ pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4())
+
+ open_mode = _winapi.PIPE_ACCESS_INBOUND
+ open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
+ open_mode |= _winapi.FILE_FLAG_OVERLAPPED
+
+ # This is the read end of the pipe, where we will be grabbing output
+ self.pipe = _winapi.CreateNamedPipe(
+ pipe_name, open_mode, _winapi.PIPE_WAIT,
+ 1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
+ )
+ # The write end of the pipe which is passed to the created process
+ pipe_write_end = _winapi.CreateFile(
+ pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL,
+ _winapi.OPEN_EXISTING, 0, _winapi.NULL
+ )
+ # Open up the handle as a python file object so we can pass it to
+ # subprocess
+ command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0)
+
+ # Connect to the read end of the pipe in overlap/async mode
+ overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True)
+ overlap.GetOverlappedResult(True)
+
+ # Spawn off the load monitor
+ command = ['typeperf', COUNTER_NAME, '-si', str(SAMPLING_INTERVAL)]
+ self.p = subprocess.Popen(command, stdout=command_stdout, cwd=support.SAVEDCWD)
+
+ # Close our copy of the write end of the pipe
+ os.close(command_stdout)
+
+ def close(self):
+ if self.p is None:
+ return
+ self.p.kill()
+ self.p.wait()
+ self.p = None
+
+ def __del__(self):
+ self.close()
+
+ def read_output(self):
+ import _winapi
+
+ overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
+ bytes_read, res = overlapped.GetOverlappedResult(False)
+ if res != 0:
+ return
+
+ return overlapped.getbuffer().decode()
+
+ def getloadavg(self):
+ typeperf_output = self.read_output()
+ # Nothing to update, just return the current load
+ if not typeperf_output:
+ return self.load
+
+ # Process the backlog of load values
+ for line in typeperf_output.splitlines():
+ # typeperf outputs in a CSV format like this:
+ # "07/19/2018 01:32:26.605","3.000000"
+ toks = line.split(',')
+ # Ignore blank lines and the initial header
+ if line.strip() == '' or (COUNTER_NAME in line) or len(toks) != 2:
+ continue
+
+ load = float(toks[1].replace('"', ''))
+ # We use an exponentially weighted moving average, imitating the
+ # load calculation on Unix systems.
+ # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
+ new_load = self.load * LOAD_FACTOR_1 + load * (1.0 - LOAD_FACTOR_1)
+ self.load = new_load
+
+ return self.load
from test.libregrtest import utils
-Py_DEBUG = hasattr(sys, 'getobjects')
+Py_DEBUG = hasattr(sys, 'gettotalrefcount')
ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..')
ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR))
self.assertTrue(ns.quiet)
self.assertEqual(ns.verbose, 0)
- def test_slow(self):
+ def test_slowest(self):
for opt in '-o', '--slowest':
with self.subTest(opt=opt):
ns = libregrtest._parse_args([opt])
self.checkError([opt], 'expected one argument')
self.checkError([opt, 'foo'], 'invalid int value')
self.checkError([opt, '2', '-T'], "don't go together")
- self.checkError([opt, '2', '-l'], "don't go together")
self.checkError([opt, '0', '-T'], "don't go together")
- self.checkError([opt, '0', '-l'], "don't go together")
def test_coverage(self):
for opt in '-T', '--coverage':
regex = list_regex('%s re-run test%s', rerun)
self.check_line(output, regex)
self.check_line(output, "Re-running failed tests in verbose mode")
- for name in rerun:
- regex = "Re-running test %r in verbose mode" % name
+ for test_name in rerun:
+ regex = f"Re-running {test_name} in verbose mode"
self.check_line(output, regex)
if no_test_ran:
result.append('SUCCESS')
result = ', '.join(result)
if rerun:
- self.check_line(output, 'Tests result: %s' % result)
+ self.check_line(output, 'Tests result: FAILURE')
result = 'FAILURE then %s' % result
self.check_line(output, 'Tests result: %s' % result)
% (self.TESTNAME_REGEX, len(tests)))
self.check_line(output, regex)
- def test_slow_interrupted(self):
+ def test_slowest_interrupted(self):
# Issue #25373: test --slowest with an interrupted test
code = TEST_INTERRUPTED
test = self.create_test("sigint", code=code)
for multiprocessing in (False, True):
- if multiprocessing:
- args = ("--slowest", "-j2", test)
- else:
- args = ("--slowest", test)
- output = self.run_tests(*args, exitcode=130)
- self.check_executed_tests(output, test,
- omitted=test, interrupted=True)
-
- regex = ('10 slowest tests:\n')
- self.check_line(output, regex)
+ with self.subTest(multiprocessing=multiprocessing):
+ if multiprocessing:
+ args = ("--slowest", "-j2", test)
+ else:
+ args = ("--slowest", test)
+ output = self.run_tests(*args, exitcode=130)
+ self.check_executed_tests(output, test,
+ omitted=test, interrupted=True)
+
+ regex = ('10 slowest tests:\n')
+ self.check_line(output, regex)
def test_coverage(self):
# test --coverage
testname)
self.assertEqual(output.splitlines(), all_methods)
+ @support.cpython_only
def test_crashed(self):
# Any code which causes a crash
code = 'import faulthandler; faulthandler._sigsegv()'
crash_test = self.create_test(name="crash", code=code)
- ok_test = self.create_test(name="ok")
- tests = [crash_test, ok_test]
+ tests = [crash_test]
output = self.run_tests("-j2", *tests, exitcode=2)
self.check_executed_tests(output, tests, failed=crash_test,
randomize=True)
fail_env_changed=True)
def test_rerun_fail(self):
+ # FAILURE then FAILURE
code = textwrap.dedent("""
import unittest
self.check_executed_tests(output, [testname],
failed=testname, rerun=testname)
+ def test_rerun_success(self):
+ # FAILURE then SUCCESS
+ code = textwrap.dedent("""
+ import builtins
+ import unittest
+
+ class Tests(unittest.TestCase):
+ failed = False
+
+ def test_fail_once(self):
+ if not hasattr(builtins, '_test_failed'):
+ builtins._test_failed = True
+ self.fail("bug")
+ """)
+ testname = self.create_test(code=code)
+
+ output = self.run_tests("-w", testname, exitcode=0)
+ self.check_executed_tests(output, [testname],
+ rerun=testname)
+
def test_no_tests_ran(self):
code = textwrap.dedent("""
import unittest
self.check_executed_tests(output, [testname, testname2],
no_test_ran=[testname])
+ @support.cpython_only
+ def test_findleaks(self):
+ code = textwrap.dedent(r"""
+ import _testcapi
+ import gc
+ import unittest
+
+ @_testcapi.with_tp_del
+ class Garbage:
+ def __tp_del__(self):
+ pass
+
+ class Tests(unittest.TestCase):
+ def test_garbage(self):
+ # create an uncollectable object
+ obj = Garbage()
+ obj.ref_cycle = obj
+ obj = None
+ """)
+ testname = self.create_test(code=code)
+
+ output = self.run_tests("--fail-env-changed", testname, exitcode=3)
+ self.check_executed_tests(output, [testname],
+ env_changed=[testname],
+ fail_env_changed=True)
+
+ # --findleaks is now basically an alias to --fail-env-changed
+ output = self.run_tests("--findleaks", testname, exitcode=3)
+ self.check_executed_tests(output, [testname],
+ env_changed=[testname],
+ fail_env_changed=True)
+
class TestUtils(unittest.TestCase):
def test_format_duration(self):
--- /dev/null
+Clean up code which checked presence of ``os.stat`` / ``os.lstat`` /
+``os.chmod`` which are always present. Patch by Anthony Sottile.
--- /dev/null
+When using mulitprocessing mode (-jN), regrtest now better reports errors if
+a worker process fails, and it exits immediately on a worker thread failure
+or when interrupted.
--- /dev/null
+regrtest now always detects uncollectable objects. Previously, the check was
+only enabled by ``--findleaks``. The check now also works with
+``-jN/--multiprocess N``. ``--findleaks`` becomes a deprecated alias to
+``--fail-env-changed``.
--- /dev/null
+Report system load when running test suite on Windows. Patch by Ammar Askar.
+Based on prior work by Jeremy Kloth.