target of the "as" clause, if there is one.
+.. function:: print_warning(msg)
+
+ Print a warning into :data:`sys.__stderr__`. Format the message as:
+ ``f"Warning -- {msg}"``. If *msg* is made of multiple lines, add
+ ``"Warning -- "`` prefix to each line.
+
+ .. versionadded:: 3.9
+
+
.. function:: wait_process(pid, *, exitcode, timeout=None)
Wait until process *pid* completes and check that the process exit code is
dt = time.monotonic() - start_time
if dt >= 5.0:
test.support.environment_altered = True
- print("Warning -- multiprocessing.Manager still has %s active "
- "children after %s seconds"
- % (multiprocessing.active_children(), dt),
- file=sys.stderr)
+ support.print_warning(f"multiprocessing.Manager still has "
+ f"{multiprocessing.active_children()} "
+ f"active children after {dt} seconds")
break
def run_worker(self, worker, obj):
processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
if processes:
test.support.environment_altered = True
- print('Warning -- Dangling processes: %s' % processes,
- file=sys.stderr)
+ support.print_warning(f'Dangling processes: {processes}')
processes = None
threads = set(threading._dangling) - set(cls.dangling[1])
if threads:
test.support.environment_altered = True
- print('Warning -- Dangling threads: %s' % threads,
- file=sys.stderr)
+ support.print_warning(f'Dangling threads: {threads}')
threads = None
dt = time.monotonic() - start_time
if dt >= 5.0:
test.support.environment_altered = True
- print("Warning -- multiprocessing.Manager still has %s active "
- "children after %s seconds"
- % (multiprocessing.active_children(), dt),
- file=sys.stderr)
+ support.print_warning(f"multiprocessing.Manager still has "
+ f"{multiprocessing.active_children()} "
+ f"active children after {dt} seconds")
break
gc.collect() # do garbage collection
# ensure that all processes which hold a reference to a
# managed object have been joined.
test.support.environment_altered = True
- print('Warning -- Shared objects which still exist at manager '
- 'shutdown:')
- print(cls.manager._debug_info())
+ support.print_warning('Shared objects which still exist '
+ 'at manager shutdown:')
+ support.print_warning(cls.manager._debug_info())
cls.manager.shutdown()
cls.manager.join()
cls.manager = None
if processes:
need_sleep = True
test.support.environment_altered = True
- print('Warning -- Dangling processes: %s' % processes,
- file=sys.stderr)
+ support.print_warning(f'Dangling processes: {processes}')
processes = None
threads = set(threading._dangling) - set(dangling[1])
if threads:
need_sleep = True
test.support.environment_altered = True
- print('Warning -- Dangling threads: %s' % threads,
- file=sys.stderr)
+ support.print_warning(f'Dangling threads: {threads}')
threads = None
# Sleep 500 ms to give time to child processes to complete.
f"directory nor file")
if verbose:
- print_warning("%r left behind %s %r" % (test_name, kind, name))
+ print_warning(f"{test_name} left behind {kind} {name!r}")
support.environment_altered = True
try:
def print_warning(msg):
- print(f"Warning -- {msg}", file=sys.stderr, flush=True)
+ support.print_warning(msg)
orig_unraisablehook = None
#=======================================================================
# Support for saving and restoring the imported modules.
+def print_warning(msg):
+ # bpo-39983: Print into sys.__stderr__ to display the warning even
+ # when sys.stderr is captured temporarily by a test
+ for line in msg.splitlines():
+ print(f"Warning -- {line}", file=sys.__stderr__, flush=True)
+
def modules_setup():
return sys.modules.copy(),
# Display a warning at the first iteration
environment_altered = True
dangling_threads = values[1]
- print("Warning -- threading_cleanup() failed to cleanup "
- "%s threads (count: %s, dangling: %s)"
- % (values[0] - original_values[0],
- values[0], len(dangling_threads)),
- file=sys.stderr)
+ print_warning(f"threading_cleanup() failed to cleanup "
+ f"{values[0] - original_values[0]} threads "
+ f"(count: {values[0]}, "
+ f"dangling: {len(dangling_threads)})")
for thread in dangling_threads:
- print(f"Dangling thread: {thread!r}", file=sys.stderr)
- sys.stderr.flush()
+ print_warning(f"Dangling thread: {thread!r}")
# Don't hold references to threads
dangling_threads = None
if pid == 0:
break
- print("Warning -- reap_children() reaped child process %s"
- % pid, file=sys.stderr)
+ print_warning(f"reap_children() reaped child process {pid}")
environment_altered = True
if time.monotonic() > deadline:
self.fail("timeout")
- with contextlib.redirect_stderr(stderr):
+ old_stderr = sys.__stderr__
+ try:
+ sys.__stderr__ = stderr
support.reap_children()
+ finally:
+ sys.__stderr__ = old_stderr
# Use environment_altered to check if reap_children() found
# the child process
os.close(fd)
self.assertEqual(more - start, 1)
+ def check_print_warning(self, msg, expected):
+ stderr = io.StringIO()
+
+ old_stderr = sys.__stderr__
+ try:
+ sys.__stderr__ = stderr
+ support.print_warning(msg)
+ finally:
+ sys.__stderr__ = old_stderr
+
+ self.assertEqual(stderr.getvalue(), expected)
+
+ def test_print_warning(self):
+ self.check_print_warning("msg",
+ "Warning -- msg\n")
+ self.check_print_warning("a\nb",
+ 'Warning -- a\nWarning -- b\n')
+
# XXX -follows a list of untested API
# make_legacy_pyc
# is_resource_enabled