encoding = locale.getencoding()
else:
encoding = sys.stdout.encoding
+
# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_fh:
# Python finalization: too late for libregrtest.
retcode = self._run_process(test_name, stdout_fh)
stdout_fh.seek(0)
- stdout = stdout_fh.read().strip()
+
+ try:
+ stdout = stdout_fh.read().strip()
+ except Exception as exc:
+ # gh-101634: Catch UnicodeDecodeError if stdout cannot be
+ # decoded from encoding
+ err_msg = f"Cannot read process stdout: {exc}"
+ return self.mp_result_error(ChildError(test_name), '', err_msg)
if retcode is None:
return self.mp_result_error(Timeout(test_name), stdout)
# Thread got an exception
format_exc = item[1]
print_warning(f"regrtest worker thread failed: {format_exc}")
+ result = ChildError("<regrtest worker>")
+ self.regrtest.accumulate_result(result)
return True
self.test_index += 1
import contextlib
import glob
import io
+import locale
import os.path
import platform
import re
for name in names:
self.assertFalse(os.path.exists(name), name)
+ def test_mp_decode_error(self):
+ # gh-101634: If a worker stdout cannot be decoded, report a failed test
+ # and a non-zero exit code.
+ if sys.platform == 'win32':
+ encoding = locale.getencoding()
+ else:
+ encoding = sys.stdout.encoding
+ if encoding is None:
+ encoding = sys.__stdout__.encoding
+ if encoding is None:
+ self.skipTest(f"cannot get regrtest worker encoding")
+
+ nonascii = b"byte:\xa0\xa9\xff\n"
+ try:
+ nonascii.decode(encoding)
+ except UnicodeDecodeError:
+ pass
+ else:
+ self.skipTest(f"{encoding} can decode non-ASCII bytes {nonascii!a}")
+
+ code = textwrap.dedent(fr"""
+ import sys
+ # bytes which cannot be decoded from UTF-8
+ nonascii = {nonascii!a}
+ sys.stdout.buffer.write(nonascii)
+ sys.stdout.buffer.flush()
+ """)
+ testname = self.create_test(code=code)
+
+ output = self.run_tests("--fail-env-changed", "-v", "-j1", testname,
+ exitcode=EXITCODE_BAD_TEST)
+ self.check_executed_tests(output, [testname],
+ failed=[testname],
+ randomize=True)
+
class TestUtils(unittest.TestCase):
def test_format_duration(self):