+import contextlib
import dataclasses
import faulthandler
import os.path
import threading
import time
import traceback
-from typing import Literal
+from typing import Literal, TextIO
from test import support
from test.support import os_helper
from .logger import Logger
from .result import TestResult, State
from .results import TestResults
-from .runtests import RunTests
+from .runtests import RunTests, JsonFile, JsonFileType
from .single import PROGRESS_MIN_TIME
from .utils import (
- StrPath, StrJSON, TestName, MS_WINDOWS,
+ StrPath, StrJSON, TestName, MS_WINDOWS, TMP_PREFIX,
format_duration, print_warning, count, plural)
from .worker import create_worker_process, USE_PROCESS_GROUP
pass
+class WorkerError(Exception):
+ def __init__(self,
+ test_name: TestName,
+ err_msg: str | None,
+ stdout: str | None,
+ state: str = State.MULTIPROCESSING_ERROR):
+ result = TestResult(test_name, state=state)
+ self.mp_result = MultiprocessResult(result, stdout, err_msg)
+ super().__init__()
+
+
class WorkerThread(threading.Thread):
def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
super().__init__()
self.output = runner.output
self.timeout = runner.worker_timeout
self.log = runner.log
- self.current_test_name = None
+ self.test_name = None
self.start_time = None
self._popen = None
self._killed = False
info.append("running")
else:
info.append('stopped')
- test = self.current_test_name
+ test = self.test_name
if test:
info.append(f'test={test}')
popen = self._popen
self._stopped = True
self._kill()
- def mp_result_error(
- self,
- test_result: TestResult,
- stdout: str | None = None,
- err_msg=None
- ) -> MultiprocessResult:
- return MultiprocessResult(test_result, stdout, err_msg)
-
- def _run_process(self, runtests: RunTests, output_fd: int, json_fd: int,
+ def _run_process(self, runtests: RunTests, output_fd: int,
tmp_dir: StrPath | None = None) -> int:
- try:
- popen = create_worker_process(runtests, output_fd, json_fd,
- tmp_dir)
-
- self._killed = False
- self._popen = popen
- except:
- self.current_test_name = None
- raise
+ popen = create_worker_process(runtests, output_fd, tmp_dir)
+ self._popen = popen
+ self._killed = False
try:
if self._stopped:
finally:
self._wait_completed()
self._popen = None
- self.current_test_name = None
- def _runtest(self, test_name: TestName) -> MultiprocessResult:
- self.current_test_name = test_name
+ def create_stdout(self, stack: contextlib.ExitStack) -> TextIO:
+ """Create stdout temporay file (file descriptor)."""
if MS_WINDOWS:
# gh-95027: When stdout is not a TTY, Python uses the ANSI code
else:
encoding = sys.stdout.encoding
+ # gh-94026: Write stdout+stderr to a tempfile as workaround for
+ # non-blocking pipes on Emscripten with NodeJS.
+ stdout_file = tempfile.TemporaryFile('w+', encoding=encoding)
+ stack.enter_context(stdout_file)
+ return stdout_file
+
+ def create_json_file(self, stack: contextlib.ExitStack) -> tuple[JsonFile, TextIO | None]:
+ """Create JSON file."""
+
+ json_file_use_filename = self.runtests.json_file_use_filename()
+ if json_file_use_filename:
+ # create an empty file to make the creation atomic
+ # (to prevent races with other worker threads)
+ prefix = TMP_PREFIX + 'json_'
+ json_fd, json_filename = tempfile.mkstemp(prefix=prefix)
+ os.close(json_fd)
+
+ stack.callback(os_helper.unlink, json_filename)
+ json_file = JsonFile(json_filename, JsonFileType.FILENAME)
+ json_tmpfile = None
+ else:
+ json_tmpfile = tempfile.TemporaryFile('w+', encoding='utf8')
+ stack.enter_context(json_tmpfile)
+
+ json_fd = json_tmpfile.fileno()
+ if MS_WINDOWS:
+ json_handle = msvcrt.get_osfhandle(json_fd)
+ json_file = JsonFile(json_handle,
+ JsonFileType.WINDOWS_HANDLE)
+ else:
+ json_file = JsonFile(json_fd, JsonFileType.UNIX_FD)
+ return (json_file, json_tmpfile)
+
+ def create_worker_runtests(self, test_name: TestName, json_file: JsonFile) -> RunTests:
+ """Create the worker RunTests."""
+
tests = (test_name,)
if self.runtests.rerun:
match_tests = self.runtests.get_match_tests(test_name)
else:
match_tests = None
- err_msg = None
- # gh-94026: Write stdout+stderr to a tempfile as workaround for
- # non-blocking pipes on Emscripten with NodeJS.
- with (tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file,
- tempfile.TemporaryFile('w+', encoding='utf8') as json_file):
- stdout_fd = stdout_file.fileno()
- json_fd = json_file.fileno()
- if MS_WINDOWS:
- json_fd = msvcrt.get_osfhandle(json_fd)
-
- kwargs = {}
- if match_tests:
- kwargs['match_tests'] = match_tests
- worker_runtests = self.runtests.copy(
- tests=tests,
- json_fd=json_fd,
- **kwargs)
-
- # gh-93353: Check for leaked temporary files in the parent process,
- # since the deletion of temporary files can happen late during
- # Python finalization: too late for libregrtest.
- if not support.is_wasi:
- # Don't check for leaked temporary files and directories if Python is
- # run on WASI. WASI don't pass environment variables like TMPDIR to
- # worker processes.
- tmp_dir = tempfile.mkdtemp(prefix="test_python_")
- tmp_dir = os.path.abspath(tmp_dir)
- try:
- retcode = self._run_process(worker_runtests,
- stdout_fd, json_fd, tmp_dir)
- finally:
- tmp_files = os.listdir(tmp_dir)
- os_helper.rmtree(tmp_dir)
- else:
+ kwargs = {}
+ if match_tests:
+ kwargs['match_tests'] = match_tests
+ return self.runtests.copy(
+ tests=tests,
+ json_file=json_file,
+ **kwargs)
+
+ def run_tmp_files(self, worker_runtests: RunTests,
+ stdout_fd: int) -> (int, list[StrPath]):
+ # gh-93353: Check for leaked temporary files in the parent process,
+ # since the deletion of temporary files can happen late during
+ # Python finalization: too late for libregrtest.
+ if not support.is_wasi:
+ # Don't check for leaked temporary files and directories if Python is
+ # run on WASI. WASI don't pass environment variables like TMPDIR to
+ # worker processes.
+ tmp_dir = tempfile.mkdtemp(prefix="test_python_")
+ tmp_dir = os.path.abspath(tmp_dir)
+ try:
retcode = self._run_process(worker_runtests,
- stdout_fd, json_fd)
- tmp_files = ()
- stdout_file.seek(0)
+ stdout_fd, tmp_dir)
+ finally:
+ tmp_files = os.listdir(tmp_dir)
+ os_helper.rmtree(tmp_dir)
+ else:
+ retcode = self._run_process(worker_runtests, stdout_fd)
+ tmp_files = []
- try:
- stdout = stdout_file.read().strip()
- except Exception as exc:
- # gh-101634: Catch UnicodeDecodeError if stdout cannot be
- # decoded from encoding
- err_msg = f"Cannot read process stdout: {exc}"
- result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
- return self.mp_result_error(result, err_msg=err_msg)
+ return (retcode, tmp_files)
- try:
- # deserialize run_tests_worker() output
- json_file.seek(0)
- worker_json: StrJSON = json_file.read()
- if worker_json:
- result = TestResult.from_json(worker_json)
- else:
- err_msg = "empty JSON"
- except Exception as exc:
- # gh-101634: Catch UnicodeDecodeError if stdout cannot be
- # decoded from encoding
- err_msg = f"Fail to read or parser worker process JSON: {exc}"
- result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
- return self.mp_result_error(result, stdout, err_msg=err_msg)
-
- if retcode is None:
- result = TestResult(test_name, state=State.TIMEOUT)
- return self.mp_result_error(result, stdout)
+ def read_stdout(self, stdout_file: TextIO) -> str:
+ stdout_file.seek(0)
+ try:
+ return stdout_file.read().strip()
+ except Exception as exc:
+ # gh-101634: Catch UnicodeDecodeError if stdout cannot be
+ # decoded from encoding
+ raise WorkerError(self.test_name,
+ f"Cannot read process stdout: {exc}", None)
+
+ def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
+ stdout: str) -> TestResult:
+ try:
+ if json_tmpfile is not None:
+ json_tmpfile.seek(0)
+ worker_json: StrJSON = json_tmpfile.read()
+ else:
+ with json_file.open(encoding='utf8') as json_fp:
+ worker_json: StrJSON = json_fp.read()
+ except Exception as exc:
+ # gh-101634: Catch UnicodeDecodeError if stdout cannot be
+ # decoded from encoding
+ err_msg = f"Failed to read worker process JSON: {exc}"
+ raise WorkerError(self.test_name, err_msg, stdout,
+ state=State.MULTIPROCESSING_ERROR)
+
+ if not worker_json:
+ raise WorkerError(self.test_name, "empty JSON", stdout)
- if retcode != 0:
- err_msg = "Exit code %s" % retcode
+ try:
+ return TestResult.from_json(worker_json)
+ except Exception as exc:
+ # gh-101634: Catch UnicodeDecodeError if stdout cannot be
+ # decoded from encoding
+ err_msg = f"Failed to parse worker process JSON: {exc}"
+ raise WorkerError(self.test_name, err_msg, stdout,
+ state=State.MULTIPROCESSING_ERROR)
+
+ def _runtest(self, test_name: TestName) -> MultiprocessResult:
+ with contextlib.ExitStack() as stack:
+ stdout_file = self.create_stdout(stack)
+ json_file, json_tmpfile = self.create_json_file(stack)
+ worker_runtests = self.create_worker_runtests(test_name, json_file)
+
+ retcode, tmp_files = self.run_tmp_files(worker_runtests,
+ stdout_file.fileno())
+
+ stdout = self.read_stdout(stdout_file)
- if err_msg:
- result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
- return self.mp_result_error(result, stdout, err_msg)
+ if retcode is None:
+ raise WorkerError(self.test_name, None, stdout, state=State.TIMEOUT)
+
+ result = self.read_json(json_file, json_tmpfile, stdout)
+
+ if retcode != 0:
+ raise WorkerError(self.test_name, f"Exit code {retcode}", stdout)
if tmp_files:
msg = (f'\n\n'
break
self.start_time = time.monotonic()
- mp_result = self._runtest(test_name)
+ self.test_name = test_name
+ try:
+ mp_result = self._runtest(test_name)
+ except WorkerError as exc:
+ mp_result = exc.mp_result
+ finally:
+ self.test_name = None
mp_result.result.duration = time.monotonic() - self.start_time
self.output.put((False, mp_result))
def get_running(workers: list[WorkerThread]) -> list[str]:
running = []
for worker in workers:
- current_test_name = worker.current_test_name
- if not current_test_name:
+ test_name = worker.test_name
+ if not test_name:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
- text = '%s (%s)' % (current_test_name, format_duration(dt))
+ text = f'{test_name} ({format_duration(dt)})'
running.append(text)
if not running:
return None
+import contextlib
import dataclasses
import json
+import os
+import subprocess
from typing import Any
+from test import support
+
from .utils import (
StrPath, StrJSON, TestTuple, FilterTuple, FilterDict)
+class JsonFileType:
+ UNIX_FD = "UNIX_FD"
+ WINDOWS_HANDLE = "WINDOWS_HANDLE"
+ FILENAME = "FILENAME"
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class JsonFile:
+ # See RunTests.json_file_use_filename()
+ file: int | StrPath
+ file_type: str
+
+ def configure_subprocess(self, popen_kwargs: dict) -> None:
+ match self.file_type:
+ case JsonFileType.UNIX_FD:
+ # Unix file descriptor
+ popen_kwargs['pass_fds'] = [self.file]
+ case JsonFileType.WINDOWS_HANDLE:
+ # Windows handle
+ startupinfo = subprocess.STARTUPINFO()
+ startupinfo.lpAttributeList = {"handle_list": [self.file]}
+ popen_kwargs['startupinfo'] = startupinfo
+ case JsonFileType.FILENAME:
+ # Filename: nothing to do to
+ pass
+
+ @contextlib.contextmanager
+ def inherit_subprocess(self):
+ if self.file_type == JsonFileType.WINDOWS_HANDLE:
+ os.set_handle_inheritable(self.file, True)
+ try:
+ yield
+ finally:
+ os.set_handle_inheritable(self.file, False)
+ else:
+ yield
+
+ def open(self, mode='r', *, encoding):
+ file = self.file
+ if self.file_type == JsonFileType.WINDOWS_HANDLE:
+ import msvcrt
+ # Create a file descriptor from the handle
+ file = msvcrt.open_osfhandle(file, os.O_WRONLY)
+ return open(file, mode, encoding=encoding)
+
+
@dataclasses.dataclass(slots=True, frozen=True)
class HuntRefleak:
warmups: int
python_cmd: tuple[str] | None
randomize: bool
random_seed: int | None
- # On Unix, it's a file descriptor.
- # On Windows, it's a handle.
- json_fd: int | None
+ json_file: JsonFile | None
def copy(self, **override):
state = dataclasses.asdict(self)
def from_json(worker_json: StrJSON) -> 'RunTests':
return json.loads(worker_json, object_hook=_decode_runtests)
+ def json_file_use_filename(self) -> bool:
+ # json_file type depends on the platform:
+ # - Unix: file descriptor (int)
+ # - Windows: handle (int)
+ # - Emscripten/WASI or if --python is used: filename (str)
+ return (
+ bool(self.python_cmd)
+ or support.is_emscripten
+ or support.is_wasi
+ )
+
class _EncodeRunTests(json.JSONEncoder):
def default(self, o: Any) -> dict[str, Any]:
data.pop('__runtests__')
if data['hunt_refleak']:
data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak'])
+ if data['json_file']:
+ data['json_file'] = JsonFile(**data['json_file'])
return RunTests(**data)
else:
return data
from test.support import os_helper
from .setup import setup_process, setup_test_dir
-from .runtests import RunTests
+from .runtests import RunTests, JsonFile
from .single import run_single_test
from .utils import (
- StrPath, StrJSON, FilterTuple, MS_WINDOWS,
+ StrPath, StrJSON, FilterTuple,
get_temp_dir, get_work_dir, exit_timeout)
USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
-def create_worker_process(runtests: RunTests,
- output_fd: int, json_fd: int,
+def create_worker_process(runtests: RunTests, output_fd: int,
tmp_dir: StrPath | None = None) -> subprocess.Popen:
python_cmd = runtests.python_cmd
worker_json = runtests.as_json()
close_fds=True,
cwd=work_dir,
)
- if not MS_WINDOWS:
- kwargs['pass_fds'] = [json_fd]
- else:
- startupinfo = subprocess.STARTUPINFO()
- startupinfo.lpAttributeList = {"handle_list": [json_fd]}
- kwargs['startupinfo'] = startupinfo
- if USE_PROCESS_GROUP:
- kwargs['start_new_session'] = True
-
- if MS_WINDOWS:
- os.set_handle_inheritable(json_fd, True)
- try:
+
+ # Pass json_file to the worker process
+ json_file = runtests.json_file
+ json_file.configure_subprocess(kwargs)
+
+ with json_file.inherit_subprocess():
return subprocess.Popen(cmd, **kwargs)
- finally:
- if MS_WINDOWS:
- os.set_handle_inheritable(json_fd, False)
def worker_process(worker_json: StrJSON) -> NoReturn:
runtests = RunTests.from_json(worker_json)
test_name = runtests.tests[0]
match_tests: FilterTuple | None = runtests.match_tests
- json_fd: int = runtests.json_fd
-
- if MS_WINDOWS:
- import msvcrt
- json_fd = msvcrt.open_osfhandle(json_fd, os.O_WRONLY)
-
+ # json_file type depends on the platform:
+ # - Unix: file descriptor (int)
+ # - Windows: handle (int)
+ # - Emscripten/WASI or if --python is used: filename (str)
+ json_file: JsonFile = runtests.json_file
setup_test_dir(runtests.test_dir)
setup_process()
result = run_single_test(test_name, runtests)
- with open(json_fd, 'w', encoding='utf-8') as json_file:
- result.write_json_into(json_file)
+ with json_file.open('w', encoding='utf-8') as json_fp:
+ result.write_json_into(json_fp)
sys.exit(0)
import platform
import random
import re
+import shlex
import subprocess
import sys
import sysconfig
parser = re.finditer(regex, output, re.MULTILINE)
return list(match.group(1) for match in parser)
- def check_executed_tests(self, output, tests, skipped=(), failed=(),
+ def check_executed_tests(self, output, tests, *, stats,
+ skipped=(), failed=(),
env_changed=(), omitted=(),
rerun=None, run_no_tests=(),
resource_denied=(),
- randomize=False, interrupted=False,
+ randomize=False, parallel=False, interrupted=False,
fail_env_changed=False,
- *, stats, forever=False, filtered=False):
+ forever=False, filtered=False):
if isinstance(tests, str):
tests = [tests]
if isinstance(skipped, str):
run_no_tests = [run_no_tests]
if isinstance(stats, int):
stats = TestStats(stats)
+ if parallel:
+ randomize = True
rerun_failed = []
if rerun is not None:
tests = [crash_test]
output = self.run_tests("-j2", *tests, exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, tests, failed=crash_test,
- randomize=True, stats=0)
+ parallel=True, stats=0)
def parse_methods(self, output):
regex = re.compile("^(test[^ ]+).*ok$", flags=re.MULTILINE)
self.check_executed_tests(output, testnames,
env_changed=testnames,
fail_env_changed=True,
- randomize=True,
+ parallel=True,
stats=len(testnames))
for testname in testnames:
self.assertIn(f"Warning -- {testname} leaked temporary "
exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
failed=[testname],
- randomize=True,
+ parallel=True,
stats=0)
def test_doctest(self):
exitcode=EXITCODE_BAD_TEST)
self.check_executed_tests(output, [testname],
failed=[testname],
- randomize=True,
+ parallel=True,
stats=TestStats(1, 1, 0))
def _check_random_seed(self, run_workers: bool):
def test_random_seed_workers(self):
self._check_random_seed(run_workers=True)
+ def test_python_command(self):
+ code = textwrap.dedent(r"""
+ import sys
+ import unittest
+
+ class WorkerTests(unittest.TestCase):
+ def test_dev_mode(self):
+ self.assertTrue(sys.flags.dev_mode)
+ """)
+ tests = [self.create_test(code=code) for _ in range(3)]
+
+ # Custom Python command: "python -X dev"
+ python_cmd = [sys.executable, '-X', 'dev']
+ # test.libregrtest.cmdline uses shlex.split() to parse the Python
+ # command line string
+ python_cmd = shlex.join(python_cmd)
+
+ output = self.run_tests("--python", python_cmd, "-j0", *tests)
+ self.check_executed_tests(output, tests,
+ stats=len(tests), parallel=True)
+
class TestUtils(unittest.TestCase):
def test_format_duration(self):