self.worker_json = None
self.start = None
self.timeout = None
+ self.memlimit = None
+ self.threshold = None
super().__init__(**kwargs)
self.hunt_refleak = None
self.test_dir: str | None = ns.testdir
self.junit_filename: str | None = ns.xmlpath
+ self.memory_limit: str | None = ns.memlimit
+ self.gc_threshold: int | None = ns.threshold
+ self.use_resources: list[str] = ns.use_resources
+ self.python_cmd: list[str] | None = ns.python
# tests
self.tests = []
return runtests
def rerun_failed_tests(self, need_rerun, runtests: RunTests):
- if self.ns.python:
+ if self.python_cmd:
# Temp patch for https://github.com/python/cpython/issues/94052
self.log(
"Re-running failed tests is not supported with --python "
if tracer is not None:
# If we're tracing code coverage, then we don't exit with status
# if on a false return value from main.
- cmd = ('result = run_single_test(test_name, runtests, self.ns)')
+ cmd = ('result = run_single_test(test_name, runtests)')
ns = dict(locals())
tracer.runctx(cmd, globals=globals(), locals=ns)
result = ns['result']
else:
- result = run_single_test(test_name, runtests, self.ns)
+ result = run_single_test(test_name, runtests)
self.accumulate_result(result)
quiet=self.quiet,
hunt_refleak=self.hunt_refleak,
test_dir=self.test_dir,
- junit_filename=self.junit_filename)
-
- setup_tests(runtests, self.ns)
+ junit_filename=self.junit_filename,
+ memory_limit=self.memory_limit,
+ gc_threshold=self.gc_threshold,
+ use_resources=self.use_resources,
+ python_cmd=self.python_cmd,
+ )
+
+ setup_tests(runtests)
tracer = self.run_tests(runtests)
self.display_result(runtests)
import gc
import importlib
import io
+import json
import os
import sys
import time
import traceback
import unittest
+from typing import Any
from test import support
from test.support import TestStats
from test.support import os_helper
from test.support import threading_helper
-from test.libregrtest.cmdline import Namespace
from test.libregrtest.save_env import saved_test_environment
from test.libregrtest.utils import clear_caches, format_duration, print_warning
hunt_refleak: HuntRefleak | None = None
test_dir: str | None = None
junit_filename: str | None = None
+ memory_limit: str | None = None
+ gc_threshold: int | None = None
+ use_resources: list[str] = None
+ python_cmd: list[str] | None = None
def copy(self, **override):
state = dataclasses.asdict(self)
else:
yield from self.tests
+ def as_json(self):
+ return json.dumps(self, cls=_EncodeRunTests)
+
@staticmethod
- def from_json_dict(json_dict):
- if json_dict['hunt_refleak']:
- json_dict['hunt_refleak'] = HuntRefleak(**json_dict['hunt_refleak'])
- return RunTests(**json_dict)
+ def from_json(worker_json):
+ return json.loads(worker_json, object_hook=_decode_runtests)
+
+
+class _EncodeRunTests(json.JSONEncoder):
+ def default(self, o: Any) -> dict[str, Any]:
+ if isinstance(o, RunTests):
+ result = dataclasses.asdict(o)
+ result["__runtests__"] = True
+ return result
+ else:
+ return super().default(o)
+
+
+def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]:
+ if "__runtests__" in data:
+ data.pop('__runtests__')
+ if data['hunt_refleak']:
+ data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak'])
+ return RunTests(**data)
+ else:
+ return data
# Minimum duration of a test to display its duration or to mention that
return 'test.' + test_name
-def setup_support(runtests: RunTests, ns: Namespace):
+def setup_support(runtests: RunTests):
support.PGO = runtests.pgo
support.PGO_EXTENDED = runtests.pgo_extended
support.set_match_tests(runtests.match_tests, runtests.ignore_tests)
support.junit_xml_list = None
-def _runtest(result: TestResult, runtests: RunTests, ns: Namespace) -> None:
+def _runtest(result: TestResult, runtests: RunTests) -> None:
# Capture stdout and stderr, set faulthandler timeout,
# and create JUnit XML report.
verbose = runtests.verbose
faulthandler.dump_traceback_later(timeout, exit=True)
try:
- setup_support(runtests, ns)
+ setup_support(runtests)
if output_on_failure:
support.verbose = True
# warnings will be written to sys.stderr below.
print_warning.orig_stderr = stream
- _runtest_env_changed_exc(result, runtests, ns, display_failure=False)
+ _runtest_env_changed_exc(result, runtests, display_failure=False)
# Ignore output if the test passed successfully
if result.state != State.PASSED:
output = stream.getvalue()
else:
# Tell tests to be moderately quiet
support.verbose = verbose
- _runtest_env_changed_exc(result, runtests, ns,
+ _runtest_env_changed_exc(result, runtests,
display_failure=not verbose)
xml_list = support.junit_xml_list
support.junit_xml_list = None
-def run_single_test(test_name: str, runtests: RunTests, ns: Namespace) -> TestResult:
+def run_single_test(test_name: str, runtests: RunTests) -> TestResult:
"""Run a single test.
- ns -- regrtest namespace of options
test_name -- the name of the test
Returns a TestResult.
result = TestResult(test_name)
pgo = runtests.pgo
try:
- _runtest(result, runtests, ns)
+ _runtest(result, runtests)
except:
if not pgo:
msg = traceback.format_exc()
FOUND_GARBAGE = []
-def _load_run_test(result: TestResult, runtests: RunTests, ns: Namespace) -> None:
+def _load_run_test(result: TestResult, runtests: RunTests) -> None:
# Load the test function, run the test function.
module_name = abs_module_name(result.test_name, runtests.test_dir)
def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
- ns: Namespace,
display_failure: bool = True) -> None:
# Detect environment changes, handle exceptions.
support.gc_collect()
with save_env(test_name, runtests):
- _load_run_test(result, runtests, ns)
+ _load_run_test(result, runtests)
except support.ResourceDenied as msg:
if not quiet and not pgo:
print(f"{test_name} skipped -- {msg}", flush=True)
@dataclasses.dataclass(slots=True)
class WorkerJob:
runtests: RunTests
- namespace: Namespace
-class _EncodeWorkerJob(json.JSONEncoder):
- def default(self, o: Any) -> dict[str, Any]:
- match o:
- case WorkerJob():
- result = dataclasses.asdict(o)
- result["__worker_job__"] = True
- return result
- case Namespace():
- result = vars(o)
- result["__namespace__"] = True
- return result
- case _:
- return super().default(o)
-
-
-def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]:
- if "__worker_job__" in d:
- d.pop('__worker_job__')
- d['runtests'] = RunTests.from_json_dict(d['runtests'])
- return WorkerJob(**d)
- if "__namespace__" in d:
- d.pop('__namespace__')
- return Namespace(**d)
- else:
- return d
-
-
-def _parse_worker_json(worker_json: str) -> tuple[Namespace, str]:
- return json.loads(worker_json, object_hook=_decode_worker_job)
-
-
-def create_worker_process(worker_job: WorkerJob,
+def create_worker_process(runtests: RunTests,
output_file: TextIO,
tmp_dir: str | None = None) -> subprocess.Popen:
- ns = worker_job.namespace
- python = ns.python
- worker_json = json.dumps(worker_job, cls=_EncodeWorkerJob)
+ python_cmd = runtests.python_cmd
+ worker_json = runtests.as_json()
- if python is not None:
- executable = python
+ if python_cmd is not None:
+ executable = python_cmd
else:
executable = [sys.executable]
cmd = [*executable, *support.args_from_interpreter_flags(),
def worker_process(worker_json: str) -> NoReturn:
- worker_job = _parse_worker_json(worker_json)
- runtests = worker_job.runtests
- ns = worker_job.namespace
+ runtests = RunTests.from_json(worker_json)
test_name = runtests.tests[0]
match_tests: FilterTuple | None = runtests.match_tests
setup_test_dir(runtests.test_dir)
- setup_tests(runtests, ns)
+ setup_tests(runtests)
if runtests.rerun:
if match_tests:
else:
print(f"Re-running {test_name} in verbose mode", flush=True)
- result = run_single_test(test_name, runtests, ns)
+ result = run_single_test(test_name, runtests)
print() # Force a newline (just in case)
# Serialize TestResult as dict in JSON
if match_tests:
kwargs['match_tests'] = match_tests
worker_runtests = self.runtests.copy(tests=tests, **kwargs)
- worker_job = WorkerJob(
- worker_runtests,
- namespace=self.ns)
# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
tmp_dir = tempfile.mkdtemp(prefix="test_python_")
tmp_dir = os.path.abspath(tmp_dir)
try:
- retcode = self._run_process(worker_job, stdout_file, tmp_dir)
+ retcode = self._run_process(worker_runtests, stdout_file, tmp_dir)
finally:
tmp_files = os.listdir(tmp_dir)
os_helper.rmtree(tmp_dir)
else:
- retcode = self._run_process(worker_job, stdout_file)
+ retcode = self._run_process(worker_runtests, stdout_file)
tmp_files = ()
stdout_file.seek(0)
sys.path.insert(0, os.path.abspath(testdir))
-def setup_tests(runtests, ns):
+def setup_tests(runtests):
try:
stderr_fd = sys.__stderr__.fileno()
except (ValueError, AttributeError):
if runtests.hunt_refleak:
unittest.BaseTestSuite._cleanup = False
- if ns.memlimit is not None:
- support.set_memlimit(ns.memlimit)
+ if runtests.memory_limit is not None:
+ support.set_memlimit(runtests.memory_limit)
- if ns.threshold is not None:
- gc.set_threshold(ns.threshold)
+ if runtests.gc_threshold is not None:
+ gc.set_threshold(runtests.gc_threshold)
support.suppress_msvcrt_asserts(runtests.verbose and runtests.verbose >= 2)
- support.use_resources = ns.use_resources
+ support.use_resources = runtests.use_resources
if hasattr(sys, 'addaudithook'):
# Add an auditing hook for all tests to ensure PySys_Audit is tested
MAX_Py_ssize_t = sys.maxsize
-def set_memlimit(limit):
- global max_memuse
- global real_max_memuse
+def _parse_memlimit(limit: str) -> int:
sizes = {
'k': 1024,
'm': _1M,
'g': _1G,
't': 1024*_1G,
}
- m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
+ m = re.match(r'(\d+(?:\.\d+)?) (K|M|G|T)b?$', limit,
re.IGNORECASE | re.VERBOSE)
if m is None:
- raise ValueError('Invalid memory limit %r' % (limit,))
- memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
- real_max_memuse = memlimit
- if memlimit > MAX_Py_ssize_t:
- memlimit = MAX_Py_ssize_t
+ raise ValueError(f'Invalid memory limit: {limit!r}')
+ return int(float(m.group(1)) * sizes[m.group(2).lower()])
+
+def set_memlimit(limit: str) -> None:
+ global max_memuse
+ global real_max_memuse
+ memlimit = _parse_memlimit(limit)
if memlimit < _2G - 1:
- raise ValueError('Memory limit %r too low to be useful' % (limit,))
+ raise ValueError('Memory limit {limit!r} too low to be useful')
+
+ real_max_memuse = memlimit
+ memlimit = min(memlimit, MAX_Py_ssize_t)
max_memuse = memlimit
+
class _MemoryWatchdog:
"""An object which periodically watches the process' memory consumption
and prints it out.
else:
self.fail("RecursionError was not raised")
- #self.assertEqual(available, 2)
+ def test_parse_memlimit(self):
+ parse = support._parse_memlimit
+ KiB = 1024
+ MiB = KiB * 1024
+ GiB = MiB * 1024
+ TiB = GiB * 1024
+ self.assertEqual(parse('0k'), 0)
+ self.assertEqual(parse('3k'), 3 * KiB)
+ self.assertEqual(parse('2.4m'), int(2.4 * MiB))
+ self.assertEqual(parse('4g'), int(4 * GiB))
+ self.assertEqual(parse('1t'), TiB)
+
+ for limit in ('', '3', '3.5.10k', '10x'):
+ with self.subTest(limit=limit):
+ with self.assertRaises(ValueError):
+ parse(limit)
+
+ def test_set_memlimit(self):
+ _4GiB = 4 * 1024 ** 3
+ TiB = 1024 ** 4
+ old_max_memuse = support.max_memuse
+ old_real_max_memuse = support.real_max_memuse
+ try:
+ if sys.maxsize > 2**32:
+ support.set_memlimit('4g')
+ self.assertEqual(support.max_memuse, _4GiB)
+ self.assertEqual(support.real_max_memuse, _4GiB)
+
+ big = 2**100 // TiB
+ support.set_memlimit(f'{big}t')
+ self.assertEqual(support.max_memuse, sys.maxsize)
+ self.assertEqual(support.real_max_memuse, big * TiB)
+ else:
+ support.set_memlimit('4g')
+ self.assertEqual(support.max_memuse, sys.maxsize)
+ self.assertEqual(support.real_max_memuse, _4GiB)
+ finally:
+ support.max_memuse = old_max_memuse
+ support.real_max_memuse = old_real_max_memuse
# XXX -follows a list of untested API
# make_legacy_pyc
# EnvironmentVarGuard
# transient_internet
# run_with_locale
- # set_memlimit
# bigmemtest
# precisionbigmemtest
# bigaddrspacetest