]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[3.12] gh-110756: Sync regrtest with main branch (#110779)
authorVictor Stinner <vstinner@python.org>
Thu, 12 Oct 2023 21:24:12 +0000 (23:24 +0200)
committerGitHub <noreply@github.com>
Thu, 12 Oct 2023 21:24:12 +0000 (21:24 +0000)
gh-110756: Sync regrtest with main branch

* Remove runtest.py and runtest_mp.py of Lib/test/libregrtest/.
* Backport support._parse_memlimit().

Lib/test/libregrtest/runtest.py [deleted file]
Lib/test/libregrtest/runtest_mp.py [deleted file]
Lib/test/support/__init__.py
Lib/test/test_support.py

diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
deleted file mode 100644 (file)
index 8b7844c..0000000
+++ /dev/null
@@ -1,577 +0,0 @@
-import dataclasses
-import doctest
-import faulthandler
-import gc
-import importlib
-import io
-import os
-import sys
-import time
-import traceback
-import unittest
-
-from test import support
-from test.support import TestStats
-from test.support import os_helper
-from test.support import threading_helper
-from test.libregrtest.cmdline import Namespace
-from test.libregrtest.save_env import saved_test_environment
-from test.libregrtest.utils import clear_caches, format_duration, print_warning
-
-
-MatchTests = list[str]
-MatchTestsDict = dict[str, MatchTests]
-
-
-# Avoid enum.Enum to reduce the number of imports when tests are run
-class State:
-    PASSED = "PASSED"
-    FAILED = "FAILED"
-    SKIPPED = "SKIPPED"
-    UNCAUGHT_EXC = "UNCAUGHT_EXC"
-    REFLEAK = "REFLEAK"
-    ENV_CHANGED = "ENV_CHANGED"
-    RESOURCE_DENIED = "RESOURCE_DENIED"
-    INTERRUPTED = "INTERRUPTED"
-    MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
-    DID_NOT_RUN = "DID_NOT_RUN"
-    TIMEOUT = "TIMEOUT"
-
-    @staticmethod
-    def is_failed(state):
-        return state in {
-            State.FAILED,
-            State.UNCAUGHT_EXC,
-            State.REFLEAK,
-            State.MULTIPROCESSING_ERROR,
-            State.TIMEOUT}
-
-    @staticmethod
-    def has_meaningful_duration(state):
-        # Consider that the duration is meaningless for these cases.
-        # For example, if a whole test file is skipped, its duration
-        # is unlikely to be the duration of executing its tests,
-        # but just the duration to execute code which skips the test.
-        return state not in {
-            State.SKIPPED,
-            State.RESOURCE_DENIED,
-            State.INTERRUPTED,
-            State.MULTIPROCESSING_ERROR,
-            State.DID_NOT_RUN}
-
-    @staticmethod
-    def must_stop(state):
-        return state in {
-            State.INTERRUPTED,
-            State.MULTIPROCESSING_ERROR}
-
-
-# gh-90681: When rerunning tests, we might need to rerun the whole
-# class or module suite if some its life-cycle hooks fail.
-# Test level hooks are not affected.
-_TEST_LIFECYCLE_HOOKS = frozenset((
-    'setUpClass', 'tearDownClass',
-    'setUpModule', 'tearDownModule',
-))
-
-def normalize_test_name(test_full_name, *, is_error=False):
-    short_name = test_full_name.split(" ")[0]
-    if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
-        if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
-            # if setUpModule() or tearDownModule() failed, don't filter
-            # tests with the test file name, don't use use filters.
-            return None
-
-        # This means that we have a failure in a life-cycle hook,
-        # we need to rerun the whole module or class suite.
-        # Basically the error looks like this:
-        #    ERROR: setUpClass (test.test_reg_ex.RegTest)
-        # or
-        #    ERROR: setUpModule (test.test_reg_ex)
-        # So, we need to parse the class / module name.
-        lpar = test_full_name.index('(')
-        rpar = test_full_name.index(')')
-        return test_full_name[lpar + 1: rpar].split('.')[-1]
-    return short_name
-
-
-@dataclasses.dataclass(slots=True)
-class TestResult:
-    test_name: str
-    state: str | None = None
-    # Test duration in seconds
-    duration: float | None = None
-    xml_data: list[str] | None = None
-    stats: TestStats | None = None
-
-    # errors and failures copied from support.TestFailedWithDetails
-    errors: list[tuple[str, str]] | None = None
-    failures: list[tuple[str, str]] | None = None
-
-    def is_failed(self, fail_env_changed: bool) -> bool:
-        if self.state == State.ENV_CHANGED:
-            return fail_env_changed
-        return State.is_failed(self.state)
-
-    def _format_failed(self):
-        if self.errors and self.failures:
-            le = len(self.errors)
-            lf = len(self.failures)
-            error_s = "error" + ("s" if le > 1 else "")
-            failure_s = "failure" + ("s" if lf > 1 else "")
-            return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
-
-        if self.errors:
-            le = len(self.errors)
-            error_s = "error" + ("s" if le > 1 else "")
-            return f"{self.test_name} failed ({le} {error_s})"
-
-        if self.failures:
-            lf = len(self.failures)
-            failure_s = "failure" + ("s" if lf > 1 else "")
-            return f"{self.test_name} failed ({lf} {failure_s})"
-
-        return f"{self.test_name} failed"
-
-    def __str__(self) -> str:
-        match self.state:
-            case State.PASSED:
-                return f"{self.test_name} passed"
-            case State.FAILED:
-                return self._format_failed()
-            case State.SKIPPED:
-                return f"{self.test_name} skipped"
-            case State.UNCAUGHT_EXC:
-                return f"{self.test_name} failed (uncaught exception)"
-            case State.REFLEAK:
-                return f"{self.test_name} failed (reference leak)"
-            case State.ENV_CHANGED:
-                return f"{self.test_name} failed (env changed)"
-            case State.RESOURCE_DENIED:
-                return f"{self.test_name} skipped (resource denied)"
-            case State.INTERRUPTED:
-                return f"{self.test_name} interrupted"
-            case State.MULTIPROCESSING_ERROR:
-                return f"{self.test_name} process crashed"
-            case State.DID_NOT_RUN:
-                return f"{self.test_name} ran no tests"
-            case State.TIMEOUT:
-                return f"{self.test_name} timed out ({format_duration(self.duration)})"
-            case _:
-                raise ValueError("unknown result state: {state!r}")
-
-    def has_meaningful_duration(self):
-        return State.has_meaningful_duration(self.state)
-
-    def set_env_changed(self):
-        if self.state is None or self.state == State.PASSED:
-            self.state = State.ENV_CHANGED
-
-    def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
-        if State.must_stop(self.state):
-            return True
-        if fail_fast and self.is_failed(fail_env_changed):
-            return True
-        return False
-
-    def get_rerun_match_tests(self):
-        match_tests = []
-
-        errors = self.errors or []
-        failures = self.failures or []
-        for error_list, is_error in (
-            (errors, True),
-            (failures, False),
-        ):
-            for full_name, *_ in error_list:
-                match_name = normalize_test_name(full_name, is_error=is_error)
-                if match_name is None:
-                    # 'setUpModule (test.test_sys)': don't filter tests
-                    return None
-                if not match_name:
-                    error_type = "ERROR" if is_error else "FAIL"
-                    print_warning(f"rerun failed to parse {error_type} test name: "
-                                  f"{full_name!r}: don't filter tests")
-                    return None
-                match_tests.append(match_name)
-
-        return match_tests
-
-
-@dataclasses.dataclass(slots=True, frozen=True)
-class RunTests:
-    tests: list[str]
-    match_tests: MatchTestsDict | None = None
-    rerun: bool = False
-    forever: bool = False
-
-    def get_match_tests(self, test_name) -> MatchTests | None:
-        if self.match_tests is not None:
-            return self.match_tests.get(test_name, None)
-        else:
-            return None
-
-    def iter_tests(self):
-        tests = tuple(self.tests)
-        if self.forever:
-            while True:
-                yield from tests
-        else:
-            yield from tests
-
-
-# Minimum duration of a test to display its duration or to mention that
-# the test is running in background
-PROGRESS_MIN_TIME = 30.0   # seconds
-
-#If these test directories are encountered recurse into them and treat each
-# test_ .py or dir as a separate test module. This can increase parallelism.
-# Beware this can't generally be done for any directory with sub-tests as the
-# __init__.py may do things which alter what tests are to be run.
-
-SPLITTESTDIRS = {
-    "test_asyncio",
-    "test_concurrent_futures",
-    "test_future_stmt",
-    "test_gdb",
-    "test_multiprocessing_fork",
-    "test_multiprocessing_forkserver",
-    "test_multiprocessing_spawn",
-}
-
-
-def findtestdir(path=None):
-    return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
-
-
-def findtests(*, testdir=None, exclude=(),
-              split_test_dirs=SPLITTESTDIRS, base_mod=""):
-    """Return a list of all applicable test modules."""
-    testdir = findtestdir(testdir)
-    tests = []
-    for name in os.listdir(testdir):
-        mod, ext = os.path.splitext(name)
-        if (not mod.startswith("test_")) or (mod in exclude):
-            continue
-        if mod in split_test_dirs:
-            subdir = os.path.join(testdir, mod)
-            mod = f"{base_mod or 'test'}.{mod}"
-            tests.extend(findtests(testdir=subdir, exclude=exclude,
-                                   split_test_dirs=split_test_dirs, base_mod=mod))
-        elif ext in (".py", ""):
-            tests.append(f"{base_mod}.{mod}" if base_mod else mod)
-    return sorted(tests)
-
-
-def split_test_packages(tests, *, testdir=None, exclude=(),
-                        split_test_dirs=SPLITTESTDIRS):
-    testdir = findtestdir(testdir)
-    splitted = []
-    for name in tests:
-        if name in split_test_dirs:
-            subdir = os.path.join(testdir, name)
-            splitted.extend(findtests(testdir=subdir, exclude=exclude,
-                                      split_test_dirs=split_test_dirs,
-                                      base_mod=name))
-        else:
-            splitted.append(name)
-    return splitted
-
-
-def abs_module_name(test_name: str, test_dir: str | None) -> str:
-    if test_name.startswith('test.') or test_dir:
-        return test_name
-    else:
-        # Import it from the test package
-        return 'test.' + test_name
-
-
-def setup_support(ns: Namespace):
-    support.PGO = ns.pgo
-    support.PGO_EXTENDED = ns.pgo_extended
-    support.set_match_tests(ns.match_tests, ns.ignore_tests)
-    support.failfast = ns.failfast
-    support.verbose = ns.verbose
-    if ns.xmlpath:
-        support.junit_xml_list = []
-    else:
-        support.junit_xml_list = None
-
-
-def _runtest(result: TestResult, ns: Namespace) -> None:
-    # Capture stdout and stderr, set faulthandler timeout,
-    # and create JUnit XML report.
-    verbose = ns.verbose
-    output_on_failure = ns.verbose3
-    timeout = ns.timeout
-
-    use_timeout = (
-        timeout is not None and threading_helper.can_start_thread
-    )
-    if use_timeout:
-        faulthandler.dump_traceback_later(timeout, exit=True)
-
-    try:
-        setup_support(ns)
-
-        if output_on_failure:
-            support.verbose = True
-
-            stream = io.StringIO()
-            orig_stdout = sys.stdout
-            orig_stderr = sys.stderr
-            print_warning = support.print_warning
-            orig_print_warnings_stderr = print_warning.orig_stderr
-
-            output = None
-            try:
-                sys.stdout = stream
-                sys.stderr = stream
-                # print_warning() writes into the temporary stream to preserve
-                # messages order. If support.environment_altered becomes true,
-                # warnings will be written to sys.stderr below.
-                print_warning.orig_stderr = stream
-
-                _runtest_env_changed_exc(result, ns, display_failure=False)
-                # Ignore output if the test passed successfully
-                if result.state != State.PASSED:
-                    output = stream.getvalue()
-            finally:
-                sys.stdout = orig_stdout
-                sys.stderr = orig_stderr
-                print_warning.orig_stderr = orig_print_warnings_stderr
-
-            if output is not None:
-                sys.stderr.write(output)
-                sys.stderr.flush()
-        else:
-            # Tell tests to be moderately quiet
-            support.verbose = verbose
-            _runtest_env_changed_exc(result, ns, display_failure=not verbose)
-
-        xml_list = support.junit_xml_list
-        if xml_list:
-            import xml.etree.ElementTree as ET
-            result.xml_data = [ET.tostring(x).decode('us-ascii')
-                               for x in xml_list]
-    finally:
-        if use_timeout:
-            faulthandler.cancel_dump_traceback_later()
-        support.junit_xml_list = None
-
-
-def runtest(ns: Namespace, test_name: str) -> TestResult:
-    """Run a single test.
-
-    ns -- regrtest namespace of options
-    test_name -- the name of the test
-
-    Returns a TestResult.
-
-    If ns.xmlpath is not None, xml_data is a list containing each
-    generated testsuite element.
-    """
-    start_time = time.perf_counter()
-    result = TestResult(test_name)
-    try:
-        _runtest(result, ns)
-    except:
-        if not ns.pgo:
-            msg = traceback.format_exc()
-            print(f"test {test_name} crashed -- {msg}",
-                  file=sys.stderr, flush=True)
-        result.state = State.UNCAUGHT_EXC
-    result.duration = time.perf_counter() - start_time
-    return result
-
-
-def run_unittest(test_mod):
-    loader = unittest.TestLoader()
-    tests = loader.loadTestsFromModule(test_mod)
-    for error in loader.errors:
-        print(error, file=sys.stderr)
-    if loader.errors:
-        raise Exception("errors while loading tests")
-    return support.run_unittest(tests)
-
-
-def save_env(ns: Namespace, test_name: str):
-    return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)
-
-
-def regrtest_runner(result, test_func, ns) -> None:
-    # Run test_func(), collect statistics, and detect reference and memory
-    # leaks.
-    if ns.huntrleaks:
-        from test.libregrtest.refleak import dash_R
-        refleak, test_result = dash_R(ns, result.test_name, test_func)
-    else:
-        test_result = test_func()
-        refleak = False
-
-    if refleak:
-        result.state = State.REFLEAK
-
-    match test_result:
-        case TestStats():
-            stats = test_result
-        case unittest.TestResult():
-            stats = TestStats.from_unittest(test_result)
-        case doctest.TestResults():
-            stats = TestStats.from_doctest(test_result)
-        case None:
-            print_warning(f"{result.test_name} test runner returned None: {test_func}")
-            stats = None
-        case _:
-            print_warning(f"Unknown test result type: {type(test_result)}")
-            stats = None
-
-    result.stats = stats
-
-
-# Storage of uncollectable objects
-FOUND_GARBAGE = []
-
-
-def _load_run_test(result: TestResult, ns: Namespace) -> None:
-    # Load the test function, run the test function.
-    module_name = abs_module_name(result.test_name, ns.testdir)
-
-    # Remove the module from sys.module to reload it if it was already imported
-    sys.modules.pop(module_name, None)
-
-    test_mod = importlib.import_module(module_name)
-
-    if hasattr(test_mod, "test_main"):
-        # https://github.com/python/cpython/issues/89392
-        raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest")
-    def test_func():
-        return run_unittest(test_mod)
-
-    try:
-        with save_env(ns, result.test_name):
-            regrtest_runner(result, test_func, ns)
-    finally:
-        # First kill any dangling references to open files etc.
-        # This can also issue some ResourceWarnings which would otherwise get
-        # triggered during the following test run, and possibly produce
-        # failures.
-        support.gc_collect()
-
-        remove_testfn(result.test_name, ns.verbose)
-
-    if gc.garbage:
-        support.environment_altered = True
-        print_warning(f"{result.test_name} created {len(gc.garbage)} "
-                      f"uncollectable object(s)")
-
-        # move the uncollectable objects somewhere,
-        # so we don't see them again
-        FOUND_GARBAGE.extend(gc.garbage)
-        gc.garbage.clear()
-
-    support.reap_children()
-
-
-def _runtest_env_changed_exc(result: TestResult, ns: Namespace,
-                             display_failure: bool = True) -> None:
-    # Detect environment changes, handle exceptions.
-
-    # Reset the environment_altered flag to detect if a test altered
-    # the environment
-    support.environment_altered = False
-
-    if ns.pgo:
-        display_failure = False
-
-    test_name = result.test_name
-    try:
-        clear_caches()
-        support.gc_collect()
-
-        with save_env(ns, test_name):
-            _load_run_test(result, ns)
-    except support.ResourceDenied as msg:
-        if not ns.quiet and not ns.pgo:
-            print(f"{test_name} skipped -- {msg}", flush=True)
-        result.state = State.RESOURCE_DENIED
-        return
-    except unittest.SkipTest as msg:
-        if not ns.quiet and not ns.pgo:
-            print(f"{test_name} skipped -- {msg}", flush=True)
-        result.state = State.SKIPPED
-        return
-    except support.TestFailedWithDetails as exc:
-        msg = f"test {test_name} failed"
-        if display_failure:
-            msg = f"{msg} -- {exc}"
-        print(msg, file=sys.stderr, flush=True)
-        result.state = State.FAILED
-        result.errors = exc.errors
-        result.failures = exc.failures
-        result.stats = exc.stats
-        return
-    except support.TestFailed as exc:
-        msg = f"test {test_name} failed"
-        if display_failure:
-            msg = f"{msg} -- {exc}"
-        print(msg, file=sys.stderr, flush=True)
-        result.state = State.FAILED
-        result.stats = exc.stats
-        return
-    except support.TestDidNotRun:
-        result.state = State.DID_NOT_RUN
-        return
-    except KeyboardInterrupt:
-        print()
-        result.state = State.INTERRUPTED
-        return
-    except:
-        if not ns.pgo:
-            msg = traceback.format_exc()
-            print(f"test {test_name} crashed -- {msg}",
-                  file=sys.stderr, flush=True)
-        result.state = State.UNCAUGHT_EXC
-        return
-
-    if support.environment_altered:
-        result.set_env_changed()
-    # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
-    if result.state is None:
-        result.state = State.PASSED
-
-
-def remove_testfn(test_name: str, verbose: int) -> None:
-    # Try to clean up os_helper.TESTFN if left behind.
-    #
-    # While tests shouldn't leave any files or directories behind, when a test
-    # fails that can be tedious for it to arrange.  The consequences can be
-    # especially nasty on Windows, since if a test leaves a file open, it
-    # cannot be deleted by name (while there's nothing we can do about that
-    # here either, we can display the name of the offending test, which is a
-    # real help).
-    name = os_helper.TESTFN
-    if not os.path.exists(name):
-        return
-
-    if os.path.isdir(name):
-        import shutil
-        kind, nuker = "directory", shutil.rmtree
-    elif os.path.isfile(name):
-        kind, nuker = "file", os.unlink
-    else:
-        raise RuntimeError(f"os.path says {name!r} exists but is neither "
-                           f"directory nor file")
-
-    if verbose:
-        print_warning(f"{test_name} left behind {kind} {name!r}")
-        support.environment_altered = True
-
-    try:
-        import stat
-        # fix possible permissions problems that might prevent cleanup
-        os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
-        nuker(name)
-    except Exception as exc:
-        print_warning(f"{test_name} left behind {kind} {name!r} "
-                      f"and it couldn't be removed: {exc}")
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
deleted file mode 100644 (file)
index 6008955..0000000
+++ /dev/null
@@ -1,631 +0,0 @@
-import dataclasses
-import faulthandler
-import json
-import os.path
-import queue
-import signal
-import subprocess
-import sys
-import tempfile
-import threading
-import time
-import traceback
-from typing import NamedTuple, NoReturn, Literal, Any, TextIO
-
-from test import support
-from test.support import os_helper
-from test.support import TestStats
-
-from test.libregrtest.cmdline import Namespace
-from test.libregrtest.main import Regrtest
-from test.libregrtest.runtest import (
-    runtest, TestResult, State, PROGRESS_MIN_TIME,
-    MatchTests, RunTests)
-from test.libregrtest.setup import setup_tests
-from test.libregrtest.utils import format_duration, print_warning
-
-if sys.platform == 'win32':
-    import locale
-
-
-# Display the running tests if nothing happened last N seconds
-PROGRESS_UPDATE = 30.0   # seconds
-assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
-
-# Kill the main process after 5 minutes. It is supposed to write an update
-# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
-# buildbot workers.
-MAIN_PROCESS_TIMEOUT = 5 * 60.0
-assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
-
-# Time to wait until a worker completes: should be immediate
-JOIN_TIMEOUT = 30.0   # seconds
-
-USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
-
-
-@dataclasses.dataclass(slots=True)
-class WorkerJob:
-    test_name: str
-    namespace: Namespace
-    rerun: bool = False
-    match_tests: MatchTests | None = None
-
-
-class _EncodeWorkerJob(json.JSONEncoder):
-    def default(self, o: Any) -> dict[str, Any]:
-        match o:
-            case WorkerJob():
-                result = dataclasses.asdict(o)
-                result["__worker_job__"] = True
-                return result
-            case Namespace():
-                result = vars(o)
-                result["__namespace__"] = True
-                return result
-            case _:
-                return super().default(o)
-
-
-def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]:
-    if "__worker_job__" in d:
-        d.pop('__worker_job__')
-        return WorkerJob(**d)
-    if "__namespace__" in d:
-        d.pop('__namespace__')
-        return Namespace(**d)
-    else:
-        return d
-
-
-def _parse_worker_args(worker_json: str) -> tuple[Namespace, str]:
-    return json.loads(worker_json,
-                      object_hook=_decode_worker_job)
-
-
-def run_test_in_subprocess(worker_job: WorkerJob,
-                           output_file: TextIO,
-                           tmp_dir: str | None = None) -> subprocess.Popen:
-    ns = worker_job.namespace
-    python = ns.python
-    worker_args = json.dumps(worker_job, cls=_EncodeWorkerJob)
-
-    if python is not None:
-        executable = python
-    else:
-        executable = [sys.executable]
-    cmd = [*executable, *support.args_from_interpreter_flags(),
-           '-u',    # Unbuffered stdout and stderr
-           '-m', 'test.regrtest',
-           '--worker-args', worker_args]
-
-    env = dict(os.environ)
-    if tmp_dir is not None:
-        env['TMPDIR'] = tmp_dir
-        env['TEMP'] = tmp_dir
-        env['TMP'] = tmp_dir
-
-    # Running the child from the same working directory as regrtest's original
-    # invocation ensures that TEMPDIR for the child is the same when
-    # sysconfig.is_python_build() is true. See issue 15300.
-    kw = dict(
-        env=env,
-        stdout=output_file,
-        # bpo-45410: Write stderr into stdout to keep messages order
-        stderr=output_file,
-        text=True,
-        close_fds=(os.name != 'nt'),
-        cwd=os_helper.SAVEDCWD,
-    )
-    if USE_PROCESS_GROUP:
-        kw['start_new_session'] = True
-    return subprocess.Popen(cmd, **kw)
-
-
-def run_tests_worker(worker_json: str) -> NoReturn:
-    worker_job = _parse_worker_args(worker_json)
-    ns = worker_job.namespace
-    test_name = worker_job.test_name
-    rerun = worker_job.rerun
-    match_tests = worker_job.match_tests
-
-    setup_tests(ns)
-
-    if rerun:
-        if match_tests:
-            matching = "matching: " + ", ".join(match_tests)
-            print(f"Re-running {test_name} in verbose mode ({matching})", flush=True)
-        else:
-            print(f"Re-running {test_name} in verbose mode", flush=True)
-        ns.verbose = True
-
-    if match_tests is not None:
-        ns.match_tests = match_tests
-
-    result = runtest(ns, test_name)
-    print()   # Force a newline (just in case)
-
-    # Serialize TestResult as dict in JSON
-    print(json.dumps(result, cls=EncodeTestResult), flush=True)
-    sys.exit(0)
-
-
-# We do not use a generator so multiple threads can call next().
-class MultiprocessIterator:
-
-    """A thread-safe iterator over tests for multiprocess mode."""
-
-    def __init__(self, tests_iter):
-        self.lock = threading.Lock()
-        self.tests_iter = tests_iter
-
-    def __iter__(self):
-        return self
-
-    def __next__(self):
-        with self.lock:
-            if self.tests_iter is None:
-                raise StopIteration
-            return next(self.tests_iter)
-
-    def stop(self):
-        with self.lock:
-            self.tests_iter = None
-
-
-class MultiprocessResult(NamedTuple):
-    result: TestResult
-    # bpo-45410: stderr is written into stdout to keep messages order
-    worker_stdout: str | None = None
-    err_msg: str | None = None
-
-
-ExcStr = str
-QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
-
-
-class ExitThread(Exception):
-    pass
-
-
-class TestWorkerProcess(threading.Thread):
-    def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
-        super().__init__()
-        self.worker_id = worker_id
-        self.runtests = runner.runtests
-        self.pending = runner.pending
-        self.output = runner.output
-        self.ns = runner.ns
-        self.timeout = runner.worker_timeout
-        self.regrtest = runner.regrtest
-        self.rerun = runner.rerun
-        self.current_test_name = None
-        self.start_time = None
-        self._popen = None
-        self._killed = False
-        self._stopped = False
-
-    def __repr__(self) -> str:
-        info = [f'TestWorkerProcess #{self.worker_id}']
-        if self.is_alive():
-            info.append("running")
-        else:
-            info.append('stopped')
-        test = self.current_test_name
-        if test:
-            info.append(f'test={test}')
-        popen = self._popen
-        if popen is not None:
-            dt = time.monotonic() - self.start_time
-            info.extend((f'pid={self._popen.pid}',
-                         f'time={format_duration(dt)}'))
-        return '<%s>' % ' '.join(info)
-
-    def _kill(self) -> None:
-        popen = self._popen
-        if popen is None:
-            return
-
-        if self._killed:
-            return
-        self._killed = True
-
-        if USE_PROCESS_GROUP:
-            what = f"{self} process group"
-        else:
-            what = f"{self}"
-
-        print(f"Kill {what}", file=sys.stderr, flush=True)
-        try:
-            if USE_PROCESS_GROUP:
-                os.killpg(popen.pid, signal.SIGKILL)
-            else:
-                popen.kill()
-        except ProcessLookupError:
-            # popen.kill(): the process completed, the TestWorkerProcess thread
-            # read its exit status, but Popen.send_signal() read the returncode
-            # just before Popen.wait() set returncode.
-            pass
-        except OSError as exc:
-            print_warning(f"Failed to kill {what}: {exc!r}")
-
-    def stop(self) -> None:
-        # Method called from a different thread to stop this thread
-        self._stopped = True
-        self._kill()
-
-    def mp_result_error(
-        self,
-        test_result: TestResult,
-        stdout: str | None = None,
-        err_msg=None
-    ) -> MultiprocessResult:
-        return MultiprocessResult(test_result, stdout, err_msg)
-
-    def _run_process(self, worker_job, output_file: TextIO,
-                     tmp_dir: str | None = None) -> int:
-        self.current_test_name = worker_job.test_name
-        try:
-            popen = run_test_in_subprocess(worker_job, output_file, tmp_dir)
-
-            self._killed = False
-            self._popen = popen
-        except:
-            self.current_test_name = None
-            raise
-
-        try:
-            if self._stopped:
-                # If kill() has been called before self._popen is set,
-                # self._popen is still running. Call again kill()
-                # to ensure that the process is killed.
-                self._kill()
-                raise ExitThread
-
-            try:
-                # gh-94026: stdout+stderr are written to tempfile
-                retcode = popen.wait(timeout=self.timeout)
-                assert retcode is not None
-                return retcode
-            except subprocess.TimeoutExpired:
-                if self._stopped:
-                    # kill() has been called: communicate() fails on reading
-                    # closed stdout
-                    raise ExitThread
-
-                # On timeout, kill the process
-                self._kill()
-
-                # None means TIMEOUT for the caller
-                retcode = None
-                # bpo-38207: Don't attempt to call communicate() again: on it
-                # can hang until all child processes using stdout
-                # pipes completes.
-            except OSError:
-                if self._stopped:
-                    # kill() has been called: communicate() fails
-                    # on reading closed stdout
-                    raise ExitThread
-                raise
-        except:
-            self._kill()
-            raise
-        finally:
-            self._wait_completed()
-            self._popen = None
-            self.current_test_name = None
-
-    def _runtest(self, test_name: str) -> MultiprocessResult:
-        if sys.platform == 'win32':
-            # gh-95027: When stdout is not a TTY, Python uses the ANSI code
-            # page for the sys.stdout encoding. If the main process runs in a
-            # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding.
-            encoding = locale.getencoding()
-        else:
-            encoding = sys.stdout.encoding
-
-        match_tests = self.runtests.get_match_tests(test_name)
-
-        # gh-94026: Write stdout+stderr to a tempfile as workaround for
-        # non-blocking pipes on Emscripten with NodeJS.
-        with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file:
-            worker_job = WorkerJob(test_name,
-                                   namespace=self.ns,
-                                   rerun=self.rerun,
-                                   match_tests=match_tests)
-            # gh-93353: Check for leaked temporary files in the parent process,
-            # since the deletion of temporary files can happen late during
-            # Python finalization: too late for libregrtest.
-            if not support.is_wasi:
-                # Don't check for leaked temporary files and directories if Python is
-                # run on WASI. WASI don't pass environment variables like TMPDIR to
-                # worker processes.
-                tmp_dir = tempfile.mkdtemp(prefix="test_python_")
-                tmp_dir = os.path.abspath(tmp_dir)
-                try:
-                    retcode = self._run_process(worker_job, stdout_file, tmp_dir)
-                finally:
-                    tmp_files = os.listdir(tmp_dir)
-                    os_helper.rmtree(tmp_dir)
-            else:
-                retcode = self._run_process(worker_job, stdout_file)
-                tmp_files = ()
-            stdout_file.seek(0)
-
-            try:
-                stdout = stdout_file.read().strip()
-            except Exception as exc:
-                # gh-101634: Catch UnicodeDecodeError if stdout cannot be
-                # decoded from encoding
-                err_msg = f"Cannot read process stdout: {exc}"
-                result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
-                return self.mp_result_error(result, err_msg=err_msg)
-
-        if retcode is None:
-            result = TestResult(test_name, state=State.TIMEOUT)
-            return self.mp_result_error(result, stdout)
-
-        err_msg = None
-        if retcode != 0:
-            err_msg = "Exit code %s" % retcode
-        else:
-            stdout, _, worker_json = stdout.rpartition("\n")
-            stdout = stdout.rstrip()
-            if not worker_json:
-                err_msg = "Failed to parse worker stdout"
-            else:
-                try:
-                    # deserialize run_tests_worker() output
-                    result = json.loads(worker_json,
-                                        object_hook=decode_test_result)
-                except Exception as exc:
-                    err_msg = "Failed to parse worker JSON: %s" % exc
-
-        if err_msg:
-            result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
-            return self.mp_result_error(result, stdout, err_msg)
-
-        if tmp_files:
-            msg = (f'\n\n'
-                   f'Warning -- {test_name} leaked temporary files '
-                   f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}')
-            stdout += msg
-            result.set_env_changed()
-
-        return MultiprocessResult(result, stdout)
-
-    def run(self) -> None:
-        fail_fast = self.ns.failfast
-        fail_env_changed = self.ns.fail_env_changed
-        while not self._stopped:
-            try:
-                try:
-                    test_name = next(self.pending)
-                except StopIteration:
-                    break
-
-                self.start_time = time.monotonic()
-                mp_result = self._runtest(test_name)
-                mp_result.result.duration = time.monotonic() - self.start_time
-                self.output.put((False, mp_result))
-
-                if mp_result.result.must_stop(fail_fast, fail_env_changed):
-                    break
-            except ExitThread:
-                break
-            except BaseException:
-                self.output.put((True, traceback.format_exc()))
-                break
-
-    def _wait_completed(self) -> None:
-        popen = self._popen
-
-        try:
-            popen.wait(JOIN_TIMEOUT)
-        except (subprocess.TimeoutExpired, OSError) as exc:
-            print_warning(f"Failed to wait for {self} completion "
-                          f"(timeout={format_duration(JOIN_TIMEOUT)}): "
-                          f"{exc!r}")
-
-    def wait_stopped(self, start_time: float) -> None:
-        # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
-        # which killed the process. Sometimes, killing the process from the
-        # main thread does not interrupt popen.communicate() in
-        # TestWorkerProcess thread. This loop with a timeout is a workaround
-        # for that.
-        #
-        # Moreover, if this method fails to join the thread, it is likely
-        # that Python will hang at exit while calling threading._shutdown()
-        # which tries again to join the blocked thread. Regrtest.main()
-        # uses EXIT_TIMEOUT to workaround this second bug.
-        while True:
-            # Write a message every second
-            self.join(1.0)
-            if not self.is_alive():
-                break
-            dt = time.monotonic() - start_time
-            self.regrtest.log(f"Waiting for {self} thread "
-                              f"for {format_duration(dt)}")
-            if dt > JOIN_TIMEOUT:
-                print_warning(f"Failed to join {self} in {format_duration(dt)}")
-                break
-
-
-def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
-    running = []
-    for worker in workers:
-        current_test_name = worker.current_test_name
-        if not current_test_name:
-            continue
-        dt = time.monotonic() - worker.start_time
-        if dt >= PROGRESS_MIN_TIME:
-            text = '%s (%s)' % (current_test_name, format_duration(dt))
-            running.append(text)
-    return running
-
-
-class MultiprocessTestRunner:
-    def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None:
-        ns = regrtest.ns
-        timeout = ns.timeout
-
-        self.regrtest = regrtest
-        self.runtests = runtests
-        self.rerun = runtests.rerun
-        self.log = self.regrtest.log
-        self.ns = ns
-        self.output: queue.Queue[QueueOutput] = queue.Queue()
-        tests_iter = runtests.iter_tests()
-        self.pending = MultiprocessIterator(tests_iter)
-        if timeout is not None:
-            # Rely on faulthandler to kill a worker process. This timouet is
-            # when faulthandler fails to kill a worker process. Give a maximum
-            # of 5 minutes to faulthandler to kill the worker.
-            self.worker_timeout = min(timeout * 1.5, timeout + 5 * 60)
-        else:
-            self.worker_timeout = None
-        self.workers = None
-
-    def start_workers(self) -> None:
-        use_mp = self.ns.use_mp
-        timeout = self.ns.timeout
-        self.workers = [TestWorkerProcess(index, self)
-                        for index in range(1, use_mp + 1)]
-        msg = f"Run tests in parallel using {len(self.workers)} child processes"
-        if timeout:
-            msg += (" (timeout: %s, worker timeout: %s)"
-                    % (format_duration(timeout),
-                       format_duration(self.worker_timeout)))
-        self.log(msg)
-        for worker in self.workers:
-            worker.start()
-
-    def stop_workers(self) -> None:
-        start_time = time.monotonic()
-        for worker in self.workers:
-            worker.stop()
-        for worker in self.workers:
-            worker.wait_stopped(start_time)
-
-    def _get_result(self) -> QueueOutput | None:
-        pgo = self.ns.pgo
-        use_faulthandler = (self.ns.timeout is not None)
-        timeout = PROGRESS_UPDATE
-
-        # bpo-46205: check the status of workers every iteration to avoid
-        # waiting forever on an empty queue.
-        while any(worker.is_alive() for worker in self.workers):
-            if use_faulthandler:
-                faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
-                                                  exit=True)
-
-            # wait for a thread
-            try:
-                return self.output.get(timeout=timeout)
-            except queue.Empty:
-                pass
-
-            # display progress
-            running = get_running(self.workers)
-            if running and not pgo:
-                self.log('running: %s' % ', '.join(running))
-
-        # all worker threads are done: consume pending results
-        try:
-            return self.output.get(timeout=0)
-        except queue.Empty:
-            return None
-
-    def display_result(self, mp_result: MultiprocessResult) -> None:
-        result = mp_result.result
-        pgo = self.ns.pgo
-
-        text = str(result)
-        if mp_result.err_msg:
-            # MULTIPROCESSING_ERROR
-            text += ' (%s)' % mp_result.err_msg
-        elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
-            text += ' (%s)' % format_duration(result.duration)
-        running = get_running(self.workers)
-        if running and not pgo:
-            text += ' -- running: %s' % ', '.join(running)
-        self.regrtest.display_progress(self.test_index, text)
-
-    def _process_result(self, item: QueueOutput) -> bool:
-        """Returns True if test runner must stop."""
-        rerun = self.runtests.rerun
-        if item[0]:
-            # Thread got an exception
-            format_exc = item[1]
-            print_warning(f"regrtest worker thread failed: {format_exc}")
-            result = TestResult("<regrtest worker>", state=State.MULTIPROCESSING_ERROR)
-            self.regrtest.accumulate_result(result, rerun=rerun)
-            return result
-
-        self.test_index += 1
-        mp_result = item[1]
-        result = mp_result.result
-        self.regrtest.accumulate_result(result, rerun=rerun)
-        self.display_result(mp_result)
-
-        if mp_result.worker_stdout:
-            print(mp_result.worker_stdout, flush=True)
-
-        return result
-
-    def run_tests(self) -> None:
-        fail_fast = self.ns.failfast
-        fail_env_changed = self.ns.fail_env_changed
-        timeout = self.ns.timeout
-
-        self.start_workers()
-
-        self.test_index = 0
-        try:
-            while True:
-                item = self._get_result()
-                if item is None:
-                    break
-
-                result = self._process_result(item)
-                if result.must_stop(fail_fast, fail_env_changed):
-                    break
-        except KeyboardInterrupt:
-            print()
-            self.regrtest.interrupted = True
-        finally:
-            if timeout is not None:
-                faulthandler.cancel_dump_traceback_later()
-
-            # Always ensure that all worker processes are no longer
-            # worker when we exit this function
-            self.pending.stop()
-            self.stop_workers()
-
-
-def run_tests_multiprocess(regrtest: Regrtest, runtests: RunTests) -> None:
-    MultiprocessTestRunner(regrtest, runtests).run_tests()
-
-
-class EncodeTestResult(json.JSONEncoder):
-    """Encode a TestResult (sub)class object into a JSON dict."""
-
-    def default(self, o: Any) -> dict[str, Any]:
-        if isinstance(o, TestResult):
-            result = dataclasses.asdict(o)
-            result["__test_result__"] = o.__class__.__name__
-            return result
-
-        return super().default(o)
-
-
-def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]:
-    """Decode a TestResult (sub)class object from a JSON dict."""
-
-    if "__test_result__" not in d:
-        return d
-
-    d.pop('__test_result__')
-    if d['stats'] is not None:
-        d['stats'] = TestStats(**d['stats'])
-    return TestResult(**d)
index 37fd9443e18007100ae9983e9504522a1bcfa8c4..ded8ad96606a7cfbd5ee3b7e95e3251a2f2e156c 100644 (file)
@@ -400,19 +400,19 @@ def check_sanitizer(*, address=False, memory=False, ub=False):
         raise ValueError('At least one of address, memory, or ub must be True')
 
 
-    _cflags = sysconfig.get_config_var('CFLAGS') or ''
-    _config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
+    cflags = sysconfig.get_config_var('CFLAGS') or ''
+    config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
     memory_sanitizer = (
-        '-fsanitize=memory' in _cflags or
-        '--with-memory-sanitizer' in _config_args
+        '-fsanitize=memory' in cflags or
+        '--with-memory-sanitizer' in config_args
     )
     address_sanitizer = (
-        '-fsanitize=address' in _cflags or
-        '--with-address-sanitizer' in _config_args
+        '-fsanitize=address' in cflags or
+        '--with-address-sanitizer' in config_args
     )
     ub_sanitizer = (
-        '-fsanitize=undefined' in _cflags or
-        '--with-undefined-behavior-sanitizer' in _config_args
+        '-fsanitize=undefined' in cflags or
+        '--with-undefined-behavior-sanitizer' in config_args
     )
     return (
         (memory and memory_sanitizer) or
@@ -916,27 +916,31 @@ _4G = 4 * _1G
 
 MAX_Py_ssize_t = sys.maxsize
 
-def set_memlimit(limit):
-    global max_memuse
-    global real_max_memuse
+def _parse_memlimit(limit: str) -> int:
     sizes = {
         'k': 1024,
         'm': _1M,
         'g': _1G,
         't': 1024*_1G,
     }
-    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
+    m = re.match(r'(\d+(?:\.\d+)?) (K|M|G|T)b?$', limit,
                  re.IGNORECASE | re.VERBOSE)
     if m is None:
-        raise ValueError('Invalid memory limit %r' % (limit,))
-    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
-    real_max_memuse = memlimit
-    if memlimit > MAX_Py_ssize_t:
-        memlimit = MAX_Py_ssize_t
+        raise ValueError(f'Invalid memory limit: {limit!r}')
+    return int(float(m.group(1)) * sizes[m.group(2).lower()])
+
+def set_memlimit(limit: str) -> None:
+    global max_memuse
+    global real_max_memuse
+    memlimit = _parse_memlimit(limit)
     if memlimit < _2G - 1:
-        raise ValueError('Memory limit %r too low to be useful' % (limit,))
+        raise ValueError('Memory limit {limit!r} too low to be useful')
+
+    real_max_memuse = memlimit
+    memlimit = min(memlimit, MAX_Py_ssize_t)
     max_memuse = memlimit
 
+
 class _MemoryWatchdog:
     """An object which periodically watches the process' memory consumption
     and prints it out.
index b9b05fc4306a31c28d23c0c08b355d3d288ff291..4a93249af313cf42a7701ee15a598255d1fb1f59 100644 (file)
@@ -764,7 +764,45 @@ class TestSupport(unittest.TestCase):
             else:
                 self.fail("RecursionError was not raised")
 
-        #self.assertEqual(available, 2)
+    def test_parse_memlimit(self):
+        parse = support._parse_memlimit
+        KiB = 1024
+        MiB = KiB * 1024
+        GiB = MiB * 1024
+        TiB = GiB * 1024
+        self.assertEqual(parse('0k'), 0)
+        self.assertEqual(parse('3k'), 3 * KiB)
+        self.assertEqual(parse('2.4m'), int(2.4 * MiB))
+        self.assertEqual(parse('4g'), int(4 * GiB))
+        self.assertEqual(parse('1t'), TiB)
+
+        for limit in ('', '3', '3.5.10k', '10x'):
+            with self.subTest(limit=limit):
+                with self.assertRaises(ValueError):
+                    parse(limit)
+
+    def test_set_memlimit(self):
+        _4GiB = 4 * 1024 ** 3
+        TiB = 1024 ** 4
+        old_max_memuse = support.max_memuse
+        old_real_max_memuse = support.real_max_memuse
+        try:
+            if sys.maxsize > 2**32:
+                support.set_memlimit('4g')
+                self.assertEqual(support.max_memuse, _4GiB)
+                self.assertEqual(support.real_max_memuse, _4GiB)
+
+                big = 2**100 // TiB
+                support.set_memlimit(f'{big}t')
+                self.assertEqual(support.max_memuse, sys.maxsize)
+                self.assertEqual(support.real_max_memuse, big * TiB)
+            else:
+                support.set_memlimit('4g')
+                self.assertEqual(support.max_memuse, sys.maxsize)
+                self.assertEqual(support.real_max_memuse, _4GiB)
+        finally:
+            support.max_memuse = old_max_memuse
+            support.real_max_memuse = old_real_max_memuse
 
     def test_copy_python_src_ignore(self):
         # Get source directory
@@ -813,7 +851,6 @@ class TestSupport(unittest.TestCase):
     # EnvironmentVarGuard
     # transient_internet
     # run_with_locale
-    # set_memlimit
     # bigmemtest
     # precisionbigmemtest
     # bigaddrspacetest