]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-109276: libregrtest: WASM use filename for JSON (#109340)
authorVictor Stinner <vstinner@python.org>
Tue, 12 Sep 2023 22:41:25 +0000 (00:41 +0200)
committerGitHub <noreply@github.com>
Tue, 12 Sep 2023 22:41:25 +0000 (00:41 +0200)
On Emscripten and WASI platforms, or if --python command line option
is used, libregrtest now uses a filename for the JSON file.

Emscripten and WASI buildbot workers run the main test process with a
different Python (Linux) which spawns Emscripten/WASI processes using
the command specified in --python command line option. Passing a file
descriptor from the parent process to the child process doesn't work
in this case.

* Add JsonFile and JsonFileType classes
* Add RunTests.json_file_use_filename() method.
* Add a test in test_regrtest on the --python command line option.
* test_regrtest: add parallel=False parameter.
* Split long RunWorkers._runtest() function into sub-functions.

Lib/test/libregrtest/main.py
Lib/test/libregrtest/run_workers.py
Lib/test/libregrtest/runtests.py
Lib/test/libregrtest/utils.py
Lib/test/libregrtest/worker.py
Lib/test/test_regrtest.py

index dd9a10764b075a767a1f48ebb64b8a62a3f68f6f..ba493ae1796fe06c4588327771ed4e30ad7cd489 100644 (file)
@@ -406,7 +406,7 @@ class Regrtest:
             python_cmd=self.python_cmd,
             randomize=self.randomize,
             random_seed=self.random_seed,
-            json_fd=None,
+            json_file=None,
         )
 
     def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
index eaca0af17ea13a28e22573868f0ad3e716fe8970..a6bebccd0c75850dd8a515546e5bf50d11325eff 100644 (file)
@@ -1,3 +1,4 @@
+import contextlib
 import dataclasses
 import faulthandler
 import os.path
@@ -9,7 +10,7 @@ import tempfile
 import threading
 import time
 import traceback
-from typing import Literal
+from typing import Literal, TextIO
 
 from test import support
 from test.support import os_helper
@@ -17,10 +18,10 @@ from test.support import os_helper
 from .logger import Logger
 from .result import TestResult, State
 from .results import TestResults
-from .runtests import RunTests
+from .runtests import RunTests, JsonFile, JsonFileType
 from .single import PROGRESS_MIN_TIME
 from .utils import (
-    StrPath, StrJSON, TestName, MS_WINDOWS,
+    StrPath, StrJSON, TestName, MS_WINDOWS, TMP_PREFIX,
     format_duration, print_warning, count, plural)
 from .worker import create_worker_process, USE_PROCESS_GROUP
 
@@ -83,6 +84,17 @@ class ExitThread(Exception):
     pass
 
 
+class WorkerError(Exception):
+    def __init__(self,
+                 test_name: TestName,
+                 err_msg: str | None,
+                 stdout: str | None,
+                 state: str = State.MULTIPROCESSING_ERROR):
+        result = TestResult(test_name, state=state)
+        self.mp_result = MultiprocessResult(result, stdout, err_msg)
+        super().__init__()
+
+
 class WorkerThread(threading.Thread):
     def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
         super().__init__()
@@ -92,7 +104,7 @@ class WorkerThread(threading.Thread):
         self.output = runner.output
         self.timeout = runner.worker_timeout
         self.log = runner.log
-        self.current_test_name = None
+        self.test_name = None
         self.start_time = None
         self._popen = None
         self._killed = False
@@ -104,7 +116,7 @@ class WorkerThread(threading.Thread):
             info.append("running")
         else:
             info.append('stopped')
-        test = self.current_test_name
+        test = self.test_name
         if test:
             info.append(f'test={test}')
         popen = self._popen
@@ -147,25 +159,11 @@ class WorkerThread(threading.Thread):
         self._stopped = True
         self._kill()
 
-    def mp_result_error(
-        self,
-        test_result: TestResult,
-        stdout: str | None = None,
-        err_msg=None
-    ) -> MultiprocessResult:
-        return MultiprocessResult(test_result, stdout, err_msg)
-
-    def _run_process(self, runtests: RunTests, output_fd: int, json_fd: int,
+    def _run_process(self, runtests: RunTests, output_fd: int,
                      tmp_dir: StrPath | None = None) -> int:
-        try:
-            popen = create_worker_process(runtests, output_fd, json_fd,
-                                          tmp_dir)
-
-            self._killed = False
-            self._popen = popen
-        except:
-            self.current_test_name = None
-            raise
+        popen = create_worker_process(runtests, output_fd, tmp_dir)
+        self._popen = popen
+        self._killed = False
 
         try:
             if self._stopped:
@@ -206,10 +204,9 @@ class WorkerThread(threading.Thread):
         finally:
             self._wait_completed()
             self._popen = None
-            self.current_test_name = None
 
-    def _runtest(self, test_name: TestName) -> MultiprocessResult:
-        self.current_test_name = test_name
+    def create_stdout(self, stack: contextlib.ExitStack) -> TextIO:
+        """Create stdout temporay file (file descriptor)."""
 
         if MS_WINDOWS:
             # gh-95027: When stdout is not a TTY, Python uses the ANSI code
@@ -219,85 +216,135 @@ class WorkerThread(threading.Thread):
         else:
             encoding = sys.stdout.encoding
 
+        # gh-94026: Write stdout+stderr to a tempfile as workaround for
+        # non-blocking pipes on Emscripten with NodeJS.
+        stdout_file = tempfile.TemporaryFile('w+', encoding=encoding)
+        stack.enter_context(stdout_file)
+        return stdout_file
+
+    def create_json_file(self, stack: contextlib.ExitStack) -> tuple[JsonFile, TextIO | None]:
+        """Create JSON file."""
+
+        json_file_use_filename = self.runtests.json_file_use_filename()
+        if json_file_use_filename:
+            # create an empty file to make the creation atomic
+            # (to prevent races with other worker threads)
+            prefix = TMP_PREFIX + 'json_'
+            json_fd, json_filename = tempfile.mkstemp(prefix=prefix)
+            os.close(json_fd)
+
+            stack.callback(os_helper.unlink, json_filename)
+            json_file = JsonFile(json_filename, JsonFileType.FILENAME)
+            json_tmpfile = None
+        else:
+            json_tmpfile = tempfile.TemporaryFile('w+', encoding='utf8')
+            stack.enter_context(json_tmpfile)
+
+            json_fd = json_tmpfile.fileno()
+            if MS_WINDOWS:
+                json_handle = msvcrt.get_osfhandle(json_fd)
+                json_file = JsonFile(json_handle,
+                                     JsonFileType.WINDOWS_HANDLE)
+            else:
+                json_file = JsonFile(json_fd, JsonFileType.UNIX_FD)
+        return (json_file, json_tmpfile)
+
+    def create_worker_runtests(self, test_name: TestName, json_file: JsonFile) -> RunTests:
+        """Create the worker RunTests."""
+
         tests = (test_name,)
         if self.runtests.rerun:
             match_tests = self.runtests.get_match_tests(test_name)
         else:
             match_tests = None
-        err_msg = None
 
-        # gh-94026: Write stdout+stderr to a tempfile as workaround for
-        # non-blocking pipes on Emscripten with NodeJS.
-        with (tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file,
-              tempfile.TemporaryFile('w+', encoding='utf8') as json_file):
-            stdout_fd = stdout_file.fileno()
-            json_fd = json_file.fileno()
-            if MS_WINDOWS:
-                json_fd = msvcrt.get_osfhandle(json_fd)
-
-            kwargs = {}
-            if match_tests:
-                kwargs['match_tests'] = match_tests
-            worker_runtests = self.runtests.copy(
-                tests=tests,
-                json_fd=json_fd,
-                **kwargs)
-
-            # gh-93353: Check for leaked temporary files in the parent process,
-            # since the deletion of temporary files can happen late during
-            # Python finalization: too late for libregrtest.
-            if not support.is_wasi:
-                # Don't check for leaked temporary files and directories if Python is
-                # run on WASI. WASI don't pass environment variables like TMPDIR to
-                # worker processes.
-                tmp_dir = tempfile.mkdtemp(prefix="test_python_")
-                tmp_dir = os.path.abspath(tmp_dir)
-                try:
-                    retcode = self._run_process(worker_runtests,
-                                                stdout_fd, json_fd, tmp_dir)
-                finally:
-                    tmp_files = os.listdir(tmp_dir)
-                    os_helper.rmtree(tmp_dir)
-            else:
+        kwargs = {}
+        if match_tests:
+            kwargs['match_tests'] = match_tests
+        return self.runtests.copy(
+            tests=tests,
+            json_file=json_file,
+            **kwargs)
+
+    def run_tmp_files(self, worker_runtests: RunTests,
+                      stdout_fd: int) -> (int, list[StrPath]):
+        # gh-93353: Check for leaked temporary files in the parent process,
+        # since the deletion of temporary files can happen late during
+        # Python finalization: too late for libregrtest.
+        if not support.is_wasi:
+            # Don't check for leaked temporary files and directories if Python is
+            # run on WASI. WASI don't pass environment variables like TMPDIR to
+            # worker processes.
+            tmp_dir = tempfile.mkdtemp(prefix="test_python_")
+            tmp_dir = os.path.abspath(tmp_dir)
+            try:
                 retcode = self._run_process(worker_runtests,
-                                            stdout_fd, json_fd)
-                tmp_files = ()
-            stdout_file.seek(0)
+                                            stdout_fd, tmp_dir)
+            finally:
+                tmp_files = os.listdir(tmp_dir)
+                os_helper.rmtree(tmp_dir)
+        else:
+            retcode = self._run_process(worker_runtests, stdout_fd)
+            tmp_files = []
 
-            try:
-                stdout = stdout_file.read().strip()
-            except Exception as exc:
-                # gh-101634: Catch UnicodeDecodeError if stdout cannot be
-                # decoded from encoding
-                err_msg = f"Cannot read process stdout: {exc}"
-                result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
-                return self.mp_result_error(result, err_msg=err_msg)
+        return (retcode, tmp_files)
 
-            try:
-                # deserialize run_tests_worker() output
-                json_file.seek(0)
-                worker_json: StrJSON = json_file.read()
-                if worker_json:
-                    result = TestResult.from_json(worker_json)
-                else:
-                    err_msg = "empty JSON"
-            except Exception as exc:
-                # gh-101634: Catch UnicodeDecodeError if stdout cannot be
-                # decoded from encoding
-                err_msg = f"Fail to read or parser worker process JSON: {exc}"
-                result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
-                return self.mp_result_error(result, stdout, err_msg=err_msg)
-
-        if retcode is None:
-            result = TestResult(test_name, state=State.TIMEOUT)
-            return self.mp_result_error(result, stdout)
+    def read_stdout(self, stdout_file: TextIO) -> str:
+        stdout_file.seek(0)
+        try:
+            return stdout_file.read().strip()
+        except Exception as exc:
+            # gh-101634: Catch UnicodeDecodeError if stdout cannot be
+            # decoded from encoding
+            raise WorkerError(self.test_name,
+                              f"Cannot read process stdout: {exc}", None)
+
+    def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
+                  stdout: str) -> TestResult:
+        try:
+            if json_tmpfile is not None:
+                json_tmpfile.seek(0)
+                worker_json: StrJSON = json_tmpfile.read()
+            else:
+                with json_file.open(encoding='utf8') as json_fp:
+                    worker_json: StrJSON = json_fp.read()
+        except Exception as exc:
+            # gh-101634: Catch UnicodeDecodeError if stdout cannot be
+            # decoded from encoding
+            err_msg = f"Failed to read worker process JSON: {exc}"
+            raise WorkerError(self.test_name, err_msg, stdout,
+                              state=State.MULTIPROCESSING_ERROR)
+
+        if not worker_json:
+            raise WorkerError(self.test_name, "empty JSON", stdout)
 
-        if retcode != 0:
-            err_msg = "Exit code %s" % retcode
+        try:
+            return TestResult.from_json(worker_json)
+        except Exception as exc:
+            # gh-101634: Catch UnicodeDecodeError if stdout cannot be
+            # decoded from encoding
+            err_msg = f"Failed to parse worker process JSON: {exc}"
+            raise WorkerError(self.test_name, err_msg, stdout,
+                              state=State.MULTIPROCESSING_ERROR)
+
+    def _runtest(self, test_name: TestName) -> MultiprocessResult:
+        with contextlib.ExitStack() as stack:
+            stdout_file = self.create_stdout(stack)
+            json_file, json_tmpfile = self.create_json_file(stack)
+            worker_runtests = self.create_worker_runtests(test_name, json_file)
+
+            retcode, tmp_files = self.run_tmp_files(worker_runtests,
+                                                    stdout_file.fileno())
+
+            stdout = self.read_stdout(stdout_file)
 
-        if err_msg:
-            result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR)
-            return self.mp_result_error(result, stdout, err_msg)
+            if retcode is None:
+                raise WorkerError(self.test_name, None, stdout, state=State.TIMEOUT)
+
+            result = self.read_json(json_file, json_tmpfile, stdout)
+
+        if retcode != 0:
+            raise WorkerError(self.test_name, f"Exit code {retcode}", stdout)
 
         if tmp_files:
             msg = (f'\n\n'
@@ -319,7 +366,13 @@ class WorkerThread(threading.Thread):
                     break
 
                 self.start_time = time.monotonic()
-                mp_result = self._runtest(test_name)
+                self.test_name = test_name
+                try:
+                    mp_result = self._runtest(test_name)
+                except WorkerError as exc:
+                    mp_result = exc.mp_result
+                finally:
+                    self.test_name = None
                 mp_result.result.duration = time.monotonic() - self.start_time
                 self.output.put((False, mp_result))
 
@@ -367,12 +420,12 @@ class WorkerThread(threading.Thread):
 def get_running(workers: list[WorkerThread]) -> list[str]:
     running = []
     for worker in workers:
-        current_test_name = worker.current_test_name
-        if not current_test_name:
+        test_name = worker.test_name
+        if not test_name:
             continue
         dt = time.monotonic() - worker.start_time
         if dt >= PROGRESS_MIN_TIME:
-            text = '%s (%s)' % (current_test_name, format_duration(dt))
+            text = f'{test_name} ({format_duration(dt)})'
             running.append(text)
     if not running:
         return None
index 5c68df126e2a8f6b20a0e070b8e964175e74e220..62a0a7e20c7b8b38e053288a23b404928263a959 100644 (file)
@@ -1,11 +1,62 @@
+import contextlib
 import dataclasses
 import json
+import os
+import subprocess
 from typing import Any
 
+from test import support
+
 from .utils import (
     StrPath, StrJSON, TestTuple, FilterTuple, FilterDict)
 
 
+class JsonFileType:
+    UNIX_FD = "UNIX_FD"
+    WINDOWS_HANDLE = "WINDOWS_HANDLE"
+    FILENAME = "FILENAME"
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class JsonFile:
+    # See RunTests.json_file_use_filename()
+    file: int | StrPath
+    file_type: str
+
+    def configure_subprocess(self, popen_kwargs: dict) -> None:
+        match self.file_type:
+            case JsonFileType.UNIX_FD:
+                # Unix file descriptor
+                popen_kwargs['pass_fds'] = [self.file]
+            case JsonFileType.WINDOWS_HANDLE:
+                # Windows handle
+                startupinfo = subprocess.STARTUPINFO()
+                startupinfo.lpAttributeList = {"handle_list": [self.file]}
+                popen_kwargs['startupinfo'] = startupinfo
+            case JsonFileType.FILENAME:
+                # Filename: nothing to do to
+                pass
+
+    @contextlib.contextmanager
+    def inherit_subprocess(self):
+        if self.file_type == JsonFileType.WINDOWS_HANDLE:
+            os.set_handle_inheritable(self.file, True)
+            try:
+                yield
+            finally:
+                os.set_handle_inheritable(self.file, False)
+        else:
+            yield
+
+    def open(self, mode='r', *, encoding):
+        file = self.file
+        if self.file_type == JsonFileType.WINDOWS_HANDLE:
+            import msvcrt
+            # Create a file descriptor from the handle
+            file = msvcrt.open_osfhandle(file, os.O_WRONLY)
+        return open(file, mode, encoding=encoding)
+
+
 @dataclasses.dataclass(slots=True, frozen=True)
 class HuntRefleak:
     warmups: int
@@ -38,9 +89,7 @@ class RunTests:
     python_cmd: tuple[str] | None
     randomize: bool
     random_seed: int | None
-    # On Unix, it's a file descriptor.
-    # On Windows, it's a handle.
-    json_fd: int | None
+    json_file: JsonFile | None
 
     def copy(self, **override):
         state = dataclasses.asdict(self)
@@ -74,6 +123,17 @@ class RunTests:
     def from_json(worker_json: StrJSON) -> 'RunTests':
         return json.loads(worker_json, object_hook=_decode_runtests)
 
+    def json_file_use_filename(self) -> bool:
+        # json_file type depends on the platform:
+        # - Unix: file descriptor (int)
+        # - Windows: handle (int)
+        # - Emscripten/WASI or if --python is used: filename (str)
+        return (
+            bool(self.python_cmd)
+            or support.is_emscripten
+            or support.is_wasi
+        )
+
 
 class _EncodeRunTests(json.JSONEncoder):
     def default(self, o: Any) -> dict[str, Any]:
@@ -90,6 +150,8 @@ def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]:
         data.pop('__runtests__')
         if data['hunt_refleak']:
             data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak'])
+        if data['json_file']:
+            data['json_file'] = JsonFile(**data['json_file'])
         return RunTests(**data)
     else:
         return data
index 03c27b9fe17053df50064da60702edbddef1496c..ce7342aabfffbe84ba590c6ce55cbeb7ce8c6289 100644 (file)
@@ -17,8 +17,12 @@ from test.support import threading_helper
 
 
 MS_WINDOWS = (sys.platform == 'win32')
-WORK_DIR_PREFIX = 'test_python_'
-WORKER_WORK_DIR_PREFIX = f'{WORK_DIR_PREFIX}worker_'
+
+# All temporary files and temporary directories created by libregrtest should
+# use TMP_PREFIX so cleanup_temp_dir() can remove them all.
+TMP_PREFIX = 'test_python_'
+WORK_DIR_PREFIX = TMP_PREFIX
+WORKER_WORK_DIR_PREFIX = WORK_DIR_PREFIX + 'worker_'
 
 # bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
 # Used to protect against threading._shutdown() hang.
@@ -387,7 +391,7 @@ def get_work_dir(parent_dir: StrPath, worker: bool = False) -> StrPath:
     # testing (see the -j option).
     # Emscripten and WASI have stubbed getpid(), Emscripten has only
     # milisecond clock resolution. Use randint() instead.
-    if sys.platform in {"emscripten", "wasi"}:
+    if support.is_emscripten or support.is_wasi:
         nounce = random.randint(0, 1_000_000)
     else:
         nounce = os.getpid()
@@ -580,7 +584,7 @@ def display_header():
 def cleanup_temp_dir(tmp_dir: StrPath):
     import glob
 
-    path = os.path.join(glob.escape(tmp_dir), WORK_DIR_PREFIX + '*')
+    path = os.path.join(glob.escape(tmp_dir), TMP_PREFIX + '*')
     print("Cleanup %s directory" % tmp_dir)
     for name in glob.glob(path):
         if os.path.isdir(name):
index 0963faa2e4d2a1daf30b5fe5b363ff424ecc89b1..ae3dcb2b06ed36cda7302d9368588209e9f358b7 100644 (file)
@@ -7,18 +7,17 @@ from test import support
 from test.support import os_helper
 
 from .setup import setup_process, setup_test_dir
-from .runtests import RunTests
+from .runtests import RunTests, JsonFile
 from .single import run_single_test
 from .utils import (
-    StrPath, StrJSON, FilterTuple, MS_WINDOWS,
+    StrPath, StrJSON, FilterTuple,
     get_temp_dir, get_work_dir, exit_timeout)
 
 
 USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
 
 
-def create_worker_process(runtests: RunTests,
-                          output_fd: int, json_fd: int,
+def create_worker_process(runtests: RunTests, output_fd: int,
                           tmp_dir: StrPath | None = None) -> subprocess.Popen:
     python_cmd = runtests.python_cmd
     worker_json = runtests.as_json()
@@ -55,34 +54,24 @@ def create_worker_process(runtests: RunTests,
         close_fds=True,
         cwd=work_dir,
     )
-    if not MS_WINDOWS:
-        kwargs['pass_fds'] = [json_fd]
-    else:
-        startupinfo = subprocess.STARTUPINFO()
-        startupinfo.lpAttributeList = {"handle_list": [json_fd]}
-        kwargs['startupinfo'] = startupinfo
-    if USE_PROCESS_GROUP:
-        kwargs['start_new_session'] = True
-
-    if MS_WINDOWS:
-        os.set_handle_inheritable(json_fd, True)
-    try:
+
+    # Pass json_file to the worker process
+    json_file = runtests.json_file
+    json_file.configure_subprocess(kwargs)
+
+    with json_file.inherit_subprocess():
         return subprocess.Popen(cmd, **kwargs)
-    finally:
-        if MS_WINDOWS:
-            os.set_handle_inheritable(json_fd, False)
 
 
 def worker_process(worker_json: StrJSON) -> NoReturn:
     runtests = RunTests.from_json(worker_json)
     test_name = runtests.tests[0]
     match_tests: FilterTuple | None = runtests.match_tests
-    json_fd: int = runtests.json_fd
-
-    if MS_WINDOWS:
-        import msvcrt
-        json_fd = msvcrt.open_osfhandle(json_fd, os.O_WRONLY)
-
+    # json_file type depends on the platform:
+    # - Unix: file descriptor (int)
+    # - Windows: handle (int)
+    # - Emscripten/WASI or if --python is used: filename (str)
+    json_file: JsonFile = runtests.json_file
 
     setup_test_dir(runtests.test_dir)
     setup_process()
@@ -96,8 +85,8 @@ def worker_process(worker_json: StrJSON) -> NoReturn:
 
     result = run_single_test(test_name, runtests)
 
-    with open(json_fd, 'w', encoding='utf-8') as json_file:
-        result.write_json_into(json_file)
+    with json_file.open('w', encoding='utf-8') as json_fp:
+        result.write_json_into(json_fp)
 
     sys.exit(0)
 
index 7cf3d05a6e6d7089ba7f490e88bef21feab920af..55cf9e7f020721468db0e6faef59bc638666bafe 100644 (file)
@@ -13,6 +13,7 @@ import os.path
 import platform
 import random
 import re
+import shlex
 import subprocess
 import sys
 import sysconfig
@@ -432,13 +433,14 @@ class BaseTestCase(unittest.TestCase):
         parser = re.finditer(regex, output, re.MULTILINE)
         return list(match.group(1) for match in parser)
 
-    def check_executed_tests(self, output, tests, skipped=(), failed=(),
+    def check_executed_tests(self, output, tests, *, stats,
+                             skipped=(), failed=(),
                              env_changed=(), omitted=(),
                              rerun=None, run_no_tests=(),
                              resource_denied=(),
-                             randomize=False, interrupted=False,
+                             randomize=False, parallel=False, interrupted=False,
                              fail_env_changed=False,
-                             *, stats, forever=False, filtered=False):
+                             forever=False, filtered=False):
         if isinstance(tests, str):
             tests = [tests]
         if isinstance(skipped, str):
@@ -455,6 +457,8 @@ class BaseTestCase(unittest.TestCase):
             run_no_tests = [run_no_tests]
         if isinstance(stats, int):
             stats = TestStats(stats)
+        if parallel:
+            randomize = True
 
         rerun_failed = []
         if rerun is not None:
@@ -1120,7 +1124,7 @@ class ArgsTestCase(BaseTestCase):
         tests = [crash_test]
         output = self.run_tests("-j2", *tests, exitcode=EXITCODE_BAD_TEST)
         self.check_executed_tests(output, tests, failed=crash_test,
-                                  randomize=True, stats=0)
+                                  parallel=True, stats=0)
 
     def parse_methods(self, output):
         regex = re.compile("^(test[^ ]+).*ok$", flags=re.MULTILINE)
@@ -1744,7 +1748,7 @@ class ArgsTestCase(BaseTestCase):
         self.check_executed_tests(output, testnames,
                                   env_changed=testnames,
                                   fail_env_changed=True,
-                                  randomize=True,
+                                  parallel=True,
                                   stats=len(testnames))
         for testname in testnames:
             self.assertIn(f"Warning -- {testname} leaked temporary "
@@ -1784,7 +1788,7 @@ class ArgsTestCase(BaseTestCase):
                                 exitcode=EXITCODE_BAD_TEST)
         self.check_executed_tests(output, [testname],
                                   failed=[testname],
-                                  randomize=True,
+                                  parallel=True,
                                   stats=0)
 
     def test_doctest(self):
@@ -1823,7 +1827,7 @@ class ArgsTestCase(BaseTestCase):
                                 exitcode=EXITCODE_BAD_TEST)
         self.check_executed_tests(output, [testname],
                                   failed=[testname],
-                                  randomize=True,
+                                  parallel=True,
                                   stats=TestStats(1, 1, 0))
 
     def _check_random_seed(self, run_workers: bool):
@@ -1866,6 +1870,27 @@ class ArgsTestCase(BaseTestCase):
     def test_random_seed_workers(self):
         self._check_random_seed(run_workers=True)
 
+    def test_python_command(self):
+        code = textwrap.dedent(r"""
+            import sys
+            import unittest
+
+            class WorkerTests(unittest.TestCase):
+                def test_dev_mode(self):
+                    self.assertTrue(sys.flags.dev_mode)
+        """)
+        tests = [self.create_test(code=code) for _ in range(3)]
+
+        # Custom Python command: "python -X dev"
+        python_cmd = [sys.executable, '-X', 'dev']
+        # test.libregrtest.cmdline uses shlex.split() to parse the Python
+        # command line string
+        python_cmd = shlex.join(python_cmd)
+
+        output = self.run_tests("--python", python_cmd, "-j0", *tests)
+        self.check_executed_tests(output, tests,
+                                  stats=len(tests), parallel=True)
+
 
 class TestUtils(unittest.TestCase):
     def test_format_duration(self):