-import contextlib
import faulthandler
import functools
import gc
import io
import os
import sys
-import tempfile
import time
import traceback
import unittest
return 'test.' + test_name
-@contextlib.contextmanager
-def override_fd(fd, fd2):
- fd2_copy = os.dup(fd2)
- try:
- os.dup2(fd, fd2)
- yield
- finally:
- os.dup2(fd2_copy, fd2)
- os.close(fd2_copy)
-
-
-def get_stream_fd(stream):
- if stream is None:
- return None
- try:
- return stream.fileno()
- except io.UnsupportedOperation:
- return None
-
-
-@contextlib.contextmanager
-def capture_std_streams():
- """
- Redirect all standard streams to a temporary file:
-
- * stdout and stderr file descriptors (fd 1 and fd 2)
- * sys.stdout, sys.__stdout__
- * sys.stderr, sys.__stderr__
- """
- try:
- stderr_fd = sys.stderr.fileno()
- except io.UnsupportedOperation:
- stderr_fd = None
-
- # Use a temporary file to support fileno() operation
- tmp_file = tempfile.TemporaryFile(mode='w+',
- # line buffering
- buffering=1,
- encoding=sys.stderr.encoding,
- errors=sys.stderr.errors)
- with contextlib.ExitStack() as stack:
- stack.enter_context(tmp_file)
-
- # Override stdout and stderr file descriptors
- tmp_fd = tmp_file.fileno()
- for stream in (sys.stdout, sys.stderr):
- fd = get_stream_fd(stream)
- if fd is not None:
- stack.enter_context(override_fd(tmp_fd, fd))
-
- # Override sys attributes
- for name in ('stdout', 'stderr', '__stdout__', '__stderr__'):
- stack.enter_context(support.swap_attr(sys, name, tmp_file))
-
- yield tmp_file
-
-
def _runtest(ns: Namespace, test_name: str) -> TestResult:
# Handle faulthandler timeout, capture stdout+stderr, XML serialization
# and measure time.
if output_on_failure:
support.verbose = True
+ stream = io.StringIO()
+ orig_stdout = sys.stdout
+ orig_stderr = sys.stderr
output = None
- with capture_std_streams() as stream:
+ try:
+ sys.stdout = stream
+ sys.stderr = stream
result = _runtest_inner(ns, test_name,
display_failure=False)
if not isinstance(result, Passed):
- stream.seek(0)
- output = stream.read()
+ output = stream.getvalue()
+ finally:
+ sys.stdout = orig_stdout
+ sys.stderr = orig_stderr
if output is not None:
sys.stderr.write(output)
def print_warning(msg):
# bpo-45410: Explicitly flush stdout to keep logs in order
flush_std_streams()
- # bpo-39983: Print into sys.__stderr__ to display the warning even
- # when sys.stderr is captured temporarily by a test
- stream = sys.__stderr__
+ stream = print_warning.orig_stderr
for line in msg.splitlines():
print(f"Warning -- {line}", file=stream)
stream.flush()
+# bpo-39983: Store the original sys.stderr at Python startup to be able to
+# log warnings even if sys.stderr is captured temporarily by a test.
+print_warning.orig_stderr = sys.stderr
+
# Flag used by saved_test_environment of test.libregrtest.save_env,
# to check if a test modified the environment. The flag should be set to False
if time.monotonic() > deadline:
self.fail("timeout")
- old_stderr = sys.__stderr__
- try:
- sys.__stderr__ = stderr
+ with support.swap_attr(support.print_warning, 'orig_stderr', stderr):
support.reap_children()
- finally:
- sys.__stderr__ = old_stderr
# Use environment_altered to check if reap_children() found
# the child process
def check_print_warning(self, msg, expected):
stderr = io.StringIO()
-
- old_stderr = sys.__stderr__
- try:
- sys.__stderr__ = stderr
+ with support.swap_attr(support.print_warning, 'orig_stderr', stderr):
support.print_warning(msg)
- finally:
- sys.__stderr__ = old_stderr
-
self.assertEqual(stderr.getvalue(), expected)
def test_print_warning(self):