]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
bpo-39983: Add test.support.print_warning() (GH-19683) (GH-19687)
authorVictor Stinner <vstinner@python.org>
Thu, 23 Apr 2020 21:55:07 +0000 (23:55 +0200)
committerGitHub <noreply@github.com>
Thu, 23 Apr 2020 21:55:07 +0000 (23:55 +0200)
Log "Warning -- ..." test warnings into sys.__stderr__ rather than
sys.stderr, to ensure to display them even if sys.stderr is captured.

test.libregrtest.utils.print_warning() now calls
test.support.print_warning().

(cherry picked from commit d663d34685e18588748569468c672763f4c73b3e)

Lib/test/_test_multiprocessing.py
Lib/test/libregrtest/runtest.py
Lib/test/libregrtest/utils.py
Lib/test/support/__init__.py
Lib/test/test_support.py

index d5336e4d7be301dee1c4c9d0d2bcdc575cb0fc69..5943dd83cc143ef3238c69d387926d655b565035 100644 (file)
@@ -5310,10 +5310,9 @@ class TestSyncManagerTypes(unittest.TestCase):
             dt = time.monotonic() - start_time
             if dt >= 5.0:
                 test.support.environment_altered = True
-                print("Warning -- multiprocessing.Manager still has %s active "
-                      "children after %s seconds"
-                      % (multiprocessing.active_children(), dt),
-                      file=sys.stderr)
+                support.print_warning(f"multiprocessing.Manager still has "
+                                      f"{multiprocessing.active_children()} "
+                                      f"active children after {dt} seconds")
                 break
 
     def run_worker(self, worker, obj):
@@ -5513,15 +5512,13 @@ class BaseMixin(object):
         processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
         if processes:
             test.support.environment_altered = True
-            print('Warning -- Dangling processes: %s' % processes,
-                  file=sys.stderr)
+            support.print_warning(f'Dangling processes: {processes}')
         processes = None
 
         threads = set(threading._dangling) - set(cls.dangling[1])
         if threads:
             test.support.environment_altered = True
-            print('Warning -- Dangling threads: %s' % threads,
-                  file=sys.stderr)
+            support.print_warning(f'Dangling threads: {threads}')
         threads = None
 
 
@@ -5589,10 +5586,9 @@ class ManagerMixin(BaseMixin):
             dt = time.monotonic() - start_time
             if dt >= 5.0:
                 test.support.environment_altered = True
-                print("Warning -- multiprocessing.Manager still has %s active "
-                      "children after %s seconds"
-                      % (multiprocessing.active_children(), dt),
-                      file=sys.stderr)
+                support.print_warning(f"multiprocessing.Manager still has "
+                                      f"{multiprocessing.active_children()} "
+                                      f"active children after {dt} seconds")
                 break
 
         gc.collect()                       # do garbage collection
@@ -5601,9 +5597,9 @@ class ManagerMixin(BaseMixin):
             # ensure that all processes which hold a reference to a
             # managed object have been joined.
             test.support.environment_altered = True
-            print('Warning -- Shared objects which still exist at manager '
-                  'shutdown:')
-            print(cls.manager._debug_info())
+            support.print_warning('Shared objects which still exist '
+                                  'at manager shutdown:')
+            support.print_warning(cls.manager._debug_info())
         cls.manager.shutdown()
         cls.manager.join()
         cls.manager = None
@@ -5700,16 +5696,14 @@ def install_tests_in_module_dict(remote_globs, start_method):
         if processes:
             need_sleep = True
             test.support.environment_altered = True
-            print('Warning -- Dangling processes: %s' % processes,
-                  file=sys.stderr)
+            support.print_warning(f'Dangling processes: {processes}')
         processes = None
 
         threads = set(threading._dangling) - set(dangling[1])
         if threads:
             need_sleep = True
             test.support.environment_altered = True
-            print('Warning -- Dangling threads: %s' % threads,
-                  file=sys.stderr)
+            support.print_warning(f'Dangling threads: {threads}')
         threads = None
 
         # Sleep 500 ms to give time to child processes to complete.
index 558f2099c66f5eb5ec180a1b4afb8c78d9b05edc..9338b28047954e0b4632d2869f263442c87efaac 100644 (file)
@@ -327,7 +327,7 @@ def cleanup_test_droppings(test_name, verbose):
                                f"directory nor file")
 
         if verbose:
-            print_warning("%r left behind %s %r" % (test_name, kind, name))
+            print_warning(f"{test_name} left behind {kind} {name!r}")
             support.environment_altered = True
 
         try:
index 98a60f7a747d9296831b7c1bb56390c167ec6cda..0467c8f8f56b6a1f40bdf7bef3a2f928b0f6ef1b 100644 (file)
@@ -2,6 +2,7 @@ import math
 import os.path
 import sys
 import textwrap
+from test import support
 
 
 def format_duration(seconds):
@@ -61,4 +62,4 @@ def printlist(x, width=70, indent=4, file=None):
 
 
 def print_warning(msg):
-    print(f"Warning -- {msg}", file=sys.stderr, flush=True)
+    support.print_warning(msg)
index a42c6f771b3d308b3b8c381b7e0cabc5a043032c..400eebc521454e2a14f1e400552a54e6cf79abaa 100644 (file)
@@ -2201,6 +2201,12 @@ def run_doctest(module, verbosity=None, optionflags=0):
 #=======================================================================
 # Support for saving and restoring the imported modules.
 
+def print_warning(msg):
+    # bpo-39983: Print into sys.__stderr__ to display the warning even
+    # when sys.stderr is captured temporarily by a test
+    for line in msg.splitlines():
+        print(f"Warning -- {line}", file=sys.__stderr__, flush=True)
+
 def modules_setup():
     return sys.modules.copy(),
 
@@ -2256,14 +2262,12 @@ def threading_cleanup(*original_values):
             # Display a warning at the first iteration
             environment_altered = True
             dangling_threads = values[1]
-            print("Warning -- threading_cleanup() failed to cleanup "
-                  "%s threads (count: %s, dangling: %s)"
-                  % (values[0] - original_values[0],
-                     values[0], len(dangling_threads)),
-                  file=sys.stderr)
+            print_warning(f"threading_cleanup() failed to cleanup "
+                          f"{values[0] - original_values[0]} threads "
+                          f"(count: {values[0]}, "
+                          f"dangling: {len(dangling_threads)})")
             for thread in dangling_threads:
-                print(f"Dangling thread: {thread!r}", file=sys.stderr)
-            sys.stderr.flush()
+                print_warning(f"Dangling thread: {thread!r}")
 
             # Don't hold references to threads
             dangling_threads = None
@@ -2356,8 +2360,7 @@ def reap_children():
         if pid == 0:
             break
 
-        print("Warning -- reap_children() reaped child process %s"
-              % pid, file=sys.stderr)
+        print_warning(f"reap_children() reaped child process {pid}")
         environment_altered = True
 
 
index e3ce670b8437f9b779e5e51f0e4e8d8d50d3a34a..80e652d7005fc6135861cdbf887e52a992312081 100644 (file)
@@ -433,8 +433,12 @@ class TestSupport(unittest.TestCase):
                 if time.monotonic() > deadline:
                     self.fail("timeout")
 
-                with contextlib.redirect_stderr(stderr):
+                old_stderr = sys.__stderr__
+                try:
+                    sys.__stderr__ = stderr
                     support.reap_children()
+                finally:
+                    sys.__stderr__ = old_stderr
 
                 # Use environment_altered to check if reap_children() found
                 # the child process
@@ -633,6 +637,24 @@ class TestSupport(unittest.TestCase):
             os.close(fd)
         self.assertEqual(more - start, 1)
 
+    def check_print_warning(self, msg, expected):
+        stderr = io.StringIO()
+
+        old_stderr = sys.__stderr__
+        try:
+            sys.__stderr__ = stderr
+            support.print_warning(msg)
+        finally:
+            sys.__stderr__ = old_stderr
+
+        self.assertEqual(stderr.getvalue(), expected)
+
+    def test_print_warning(self):
+        self.check_print_warning("msg",
+                                 "Warning -- msg\n")
+        self.check_print_warning("a\nb",
+                                 'Warning -- a\nWarning -- b\n')
+
     # XXX -follows a list of untested API
     # make_legacy_pyc
     # is_resource_enabled