Don't ignore errors raised by `PyErr_WarnFormat` in `warn_about_fork_with_threads`
Instead, ignore the warnings in all test code that forks. (That's a lot of functions.)
In `test_support`, make `ignore_warnings` a context manager (as well as decorator),
and add a `message` argument to it.
Also add a `ignore_fork_in_thread_deprecation_warnings` helper for the deadlock-in-fork
warning.
from test.support import socket_helper
from test.support import threading_helper
from test.support import warnings_helper
-
+from test.support.script_helper import assert_python_failure, assert_python_ok
# Skip tests if _multiprocessing wasn't built.
_multiprocessing = import_helper.import_module('_multiprocessing')
self.assertEqual(current.ident, os.getpid())
self.assertEqual(current.exitcode, None)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_set_executable(self):
if self.TYPE == 'threads':
self.skipTest(f'test not appropriate for {self.TYPE}')
p.join()
self.assertEqual(p.exitcode, 0)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_resource('cpu')
def test_args_argument(self):
# bpo-45735: Using list or tuple as *args* in constructor could
q.put(bytes(current.authkey))
q.put(current.pid)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_parent_process_attributes(self):
if self.TYPE == "threads":
self.skipTest('test not appropriate for {}'.format(self.TYPE))
from multiprocessing.process import parent_process
wconn.send([parent_process().pid, parent_process().name])
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_parent_process(self):
if self.TYPE == "threads":
self.skipTest('test not appropriate for {}'.format(self.TYPE))
parent_process().join(timeout=support.SHORT_TIMEOUT)
wconn.send("alive" if parent_process().is_alive() else "not alive")
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_process(self):
q = self.Queue(1)
e = self.Event()
self.assertNotIn(p, self.active_children())
close_queue(q)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id")
def test_process_mainthread_native_id(self):
if self.TYPE == 'threads':
def _test_sleep(cls, delay):
time.sleep(delay)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def _kill_process(self, meth, target=None):
if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
return p.exitcode
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipIf(os.name == 'nt', "POSIX only")
def test_interrupt(self):
exitcode = self._kill_process(multiprocessing.Process.interrupt)
# (KeyboardInterrupt in this case)
# in multiprocessing.BaseProcess._bootstrap
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipIf(os.name == 'nt', "POSIX only")
def test_interrupt_no_handler(self):
exitcode = self._kill_process(multiprocessing.Process.interrupt, target=self._sleep_no_int_handler)
self.assertEqual(exitcode, -signal.SIGINT)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_terminate(self):
exitcode = self._kill_process(multiprocessing.Process.terminate)
self.assertEqual(exitcode, -signal.SIGTERM)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_kill(self):
exitcode = self._kill_process(multiprocessing.Process.kill)
if os.name != 'nt':
self.assertIsInstance(cpus, int)
self.assertGreaterEqual(cpus, 1)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_active_children(self):
self.assertEqual(type(self.active_children()), list)
p.start()
p.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_recursion(self):
rconn, wconn = self.Pipe(duplex=False)
self._test_recursion(wconn, [])
def _test_sentinel(cls, event):
event.wait(10.0)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_sentinel(self):
if self.TYPE == "threads":
self.skipTest('test not appropriate for {}'.format(self.TYPE))
q.get()
sys.exit(rc)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_close(self):
if self.TYPE == "threads":
self.skipTest('test not appropriate for {}'.format(self.TYPE))
close_queue(q)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_resource('walltime')
def test_many_processes(self):
if self.TYPE == 'threads':
for p in procs:
self.assertIn(p.exitcode, exitcodes)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_lose_target_ref(self):
c = DummyCallable()
wr = weakref.ref(c)
threading.Thread(target=func1).start()
threading.Thread(target=func2, daemon=True).start()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_wait_for_threads(self):
# A child process should wait for non-daemonic threads to end
# before exiting
setattr(sys, stream_name, None)
evt.set()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_error_on_stdio_flush_1(self):
# Check that Process works with broken standard streams
streams = [io.StringIO(), None]
finally:
setattr(sys, stream_name, old_stream)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_error_on_stdio_flush_2(self):
# Same as test_error_on_stdio_flush_1(), but standard streams are
# broken by the child process
ALLOWED_TYPES = ('processes',)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_subclassing(self):
uppercaser = _UpperCaser()
uppercaser.daemon = True
uppercaser.stop()
uppercaser.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_stderr_flush(self):
# sys.stderr is flushed at process shutdown (issue #13812)
if self.TYPE == "threads":
sys.stderr = open(fd, 'w', encoding="utf-8", closefd=False)
sys.exit(reason)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_sys_exit(self):
# See Issue 13854
if self.TYPE == 'threads':
queue.get()
parent_can_continue.set()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_put(self):
MAXSIZE = 6
queue = self.Queue(maxsize=MAXSIZE)
queue.put(5)
parent_can_continue.set()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_get(self):
queue = self.Queue()
child_can_start = self.Event()
# process cannot shutdown until the feeder thread has finished
# pushing items onto the pipe.
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_fork(self):
# Old versions of Queue would fail to create a new feeder
# thread for a forked process if the original process had its
time.sleep(DELTA)
q.task_done()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_task_done(self):
queue = self.JoinableQueue()
self.fail("Probable regression on import lock contention;"
" see Issue #22853")
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_timeout(self):
q = multiprocessing.Queue()
start = time.monotonic()
event.set()
time.sleep(1.0)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_repr_lock(self):
if self.TYPE != 'processes':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
res.value = lock.locked()
event.set()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
def test_lock_locked_2processes(self):
if self.TYPE != 'processes':
for _ in range(n):
lock.release()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_repr_rlock(self):
if self.TYPE != 'processes':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
self.assertFalse(lock.locked())
self.assertRaises((AssertionError, RuntimeError), lock.release)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
def test_rlock_locked_2processes(self):
if self.TYPE != 'processes':
except NotImplementedError:
pass
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_notify(self):
cond = self.Condition()
sleeping = self.Semaphore(0)
threading_helper.join_thread(t)
join_process(p)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_notify_all(self):
cond = self.Condition()
sleeping = self.Semaphore(0)
# NOTE: join_process and join_thread are the same
threading_helper.join_thread(w)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_notify_n(self):
cond = self.Condition()
sleeping = self.Semaphore(0)
if not result or state.value != 4:
sys.exit(1)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
def test_waitfor(self):
# based on test in test/lock_tests.py
if not result and (expected - CLOCK_RES) <= dt:
success.value = True
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
def test_waitfor_timeout(self):
# based on test in test/lock_tests.py
if pid is not None:
os.kill(pid, signal.SIGINT)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_wait_result(self):
if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
pid = os.getpid()
time.sleep(TIMEOUT2)
event.set()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_event(self):
event = self.Event()
wait = TimingWrapper(event.wait)
pass
assert not barrier.broken
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_barrier(self, passes=1):
"""
Test that a barrier is passed in lockstep
results = [self.DummyList(), self.DummyList()]
self.run_threads(self.multipass, (self.barrier, results, passes))
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_barrier_10(self):
"""
Test that a barrier works for 10 consecutive runs
res = barrier.wait()
queue.put(res)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_wait_return(self):
"""
test the return value from barrier.wait
if len(results) != 1:
raise RuntimeError
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_action(self):
"""
Test the 'action' callback
except RuntimeError:
barrier.abort()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_abort(self):
"""
Test that an abort will put the barrier in a broken state
barrier.wait()
results3.append(True)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_reset(self):
"""
Test that a 'reset' on a barrier frees the waiting threads
barrier.wait()
results3.append(True)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_abort_and_reset(self):
"""
Test that a barrier can be reset after being broken.
except threading.BrokenBarrierError:
results.append(True)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_timeout(self):
"""
Test wait(timeout)
except threading.BrokenBarrierError:
results.append(True)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_default_timeout(self):
"""
Test the barrier's default timeout
with lock:
conn.send(i)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_thousand(self):
if self.TYPE == 'manager':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
for sv, cv in zip(values, cls.codes_values):
sv.value = cv[2]
-
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_value(self, raw=False):
if raw:
values = [self.RawValue(code, value)
for i in range(1, len(seq)):
seq[i] += seq[i-1]
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipIf(c_int is None, "requires _ctypes")
def test_array(self, raw=False):
seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
@classmethod
def setUpClass(cls):
- super().setUpClass()
- cls.pool = cls.Pool(4)
+ with warnings_helper.ignore_fork_in_thread_deprecation_warnings():
+ super().setUpClass()
+ cls.pool = cls.Pool(4)
@classmethod
def tearDownClass(cls):
self.assertEqual(get(), 49)
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_async_timeout(self):
p = self.Pool(3)
try:
self.assertIn(value, expected_values)
expected_values.remove(value)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_make_pool(self):
expected_error = (RemoteError if self.TYPE == 'manager'
else ValueError)
p.close()
p.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_terminate(self):
# Simulate slow tasks which take "forever" to complete
sleep_time = support.LONG_TIMEOUT
p.terminate()
p.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_empty_iterable(self):
# See Issue 12157
p = self.Pool(1)
p.close()
p.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_context(self):
if self.TYPE == 'processes':
L = list(range(10))
def _test_traceback(cls):
raise RuntimeError(123) # some comment
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_traceback(self):
# We want ensure that the traceback from the child process is
# contained in the traceback raised in the main process.
p.join()
@classmethod
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def _test_wrapped_exception(cls):
raise RuntimeError('foo')
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_wrapped_exception(self):
# Issue #20980: Should not wrap exception when using thread pool
with self.Pool(1) as p:
p.apply(self._test_wrapped_exception)
p.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_no_failfast(self):
# Issue #23992: the fail-fast behaviour when an exception is raised
# during map() would make Pool.join() deadlock, because a worker
# they were released too.
self.assertEqual(CountedObject.n_instances, 0)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_enter(self):
if self.TYPE == 'manager':
self.skipTest("test not applicable to manager")
pass
pool.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_resource_warning(self):
if self.TYPE == 'manager':
self.skipTest("test not applicable to manager")
class _TestPoolWorkerErrors(BaseTestCase):
ALLOWED_TYPES = ('processes', )
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_async_error_callback(self):
p = multiprocessing.Pool(2)
p.close()
p.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_unpickleable_result(self):
from multiprocessing.pool import MaybeEncodingError
p = multiprocessing.Pool(2)
class _TestPoolWorkerLifetime(BaseTestCase):
ALLOWED_TYPES = ('processes', )
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_pool_worker_lifetime(self):
p = multiprocessing.Pool(3, maxtasksperchild=10)
self.assertEqual(3, len(p._pool))
p.close()
p.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_pool_worker_lifetime_early_close(self):
# Issue #10332: closing a pool whose workers have limited lifetimes
# before all the tasks completed would make join() hang.
ALLOWED_TYPES = ('manager',)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_mymanager(self):
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
# which happens on slow buildbots.
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_mymanager_context(self):
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
with manager:
# which happens on slow buildbots.
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_mymanager_context_prestarted(self):
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
# Note that xmlrpclib will deserialize object as a list not a tuple
queue.put(tuple(cls.values))
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_remote(self):
authkey = os.urandom(32)
queue = manager.get_queue()
queue.put('hello world')
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_rapid_restart(self):
authkey = os.urandom(32)
manager = QueueManager(
class TestManagerExceptions(unittest.TestCase):
# Issue 106558: Manager exceptions avoids creating cyclic references.
def setUp(self):
- self.mgr = multiprocessing.Manager()
+ with warnings_helper.ignore_fork_in_thread_deprecation_warnings():
+ self.mgr = multiprocessing.Manager()
def tearDown(self):
self.mgr.shutdown()
self.mgr.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_queue_get(self):
queue = self.mgr.Queue()
if gc.isenabled():
wr = weakref.ref(e)
self.assertEqual(wr(), None)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_dispatch(self):
if gc.isenabled():
gc.disable()
conn.send_bytes(msg)
conn.close()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_connection(self):
conn, child_conn = self.Pipe()
self.assertRaises(OSError, writer.recv)
self.assertRaises(OSError, writer.poll)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_spawn_close(self):
# We test that a pipe connection can be closed by parent
# process immediately after child is spawned. On Windows this
os.write(fd, data)
os.close(fd)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
def test_fd_transfer(self):
if self.TYPE != 'processes':
with open(os_helper.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"foo")
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
@unittest.skipIf(sys.platform == "win32",
"test semantics don't make sense on Windows")
def _send_data_without_fd(self, conn):
os.write(conn.fileno(), b"\0")
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
@unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
def test_missing_fd_transfer(self):
conn.send('hello')
conn.close()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_listener_client(self):
for family in self.connection.families:
l = self.connection.Listener(family=family)
p.join()
l.close()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_issue14725(self):
l = self.connection.Listener()
p = self.Process(target=self._test, args=(l.address,))
conn.send_bytes(s)
conn.close()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_strings(self):
strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
a, b = self.Pipe()
# read from it.
r.poll(5)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_boundaries(self):
r, w = self.Pipe(False)
p = self.Process(target=self._child_boundaries, args=(r,))
b.send_bytes(b'b')
b.send_bytes(b'cd')
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_dont_merge(self):
a, b = self.Pipe()
self.assertEqual(a.poll(0.0), False)
conn.close()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_pickling(self):
families = self.connection.families
conn.close()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_access(self):
# On Windows, if we do not specify a destination pid when
# using DupHandle then we need to be careful to use the
for i in range(len(arr)):
arr[i] *= 2
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_sharedctypes(self, lock=False):
x = Value('i', 7, lock=lock)
y = Value(c_double, 1.0/3.0, lock=lock)
with self.assertRaises(FileNotFoundError):
pickle.loads(pickled_sms)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_shared_memory_across_processes(self):
# bpo-40135: don't define shared memory block's name in case of
# the failure when we run multiprocessing tests in parallel.
sms.close()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms")
def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
# bpo-36368: protect SharedMemoryManager server process from
# properly released sl.
self.assertFalse(err)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_shared_memory_SharedMemoryManager_basics(self):
smm1 = multiprocessing.managers.SharedMemoryManager()
with self.assertRaises(ValueError):
conn.close()
os._exit(0)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_finalize(self):
conn, child_conn = self.Pipe()
logger = multiprocessing.get_logger()
conn.send(logger.getEffectiveLevel())
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_level(self):
LEVEL1 = 32
LEVEL2 = 37
time.sleep(0.1)
os.kill(pid, signal.SIGUSR1)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
def test_poll_eintr(self):
got_signal = [False]
@hashlib_helper.requires_hashdigest('sha256')
class TestInitializers(unittest.TestCase):
def setUp(self):
- self.mgr = multiprocessing.Manager()
- self.ns = self.mgr.Namespace()
- self.ns.test = 0
+ with warnings_helper.ignore_fork_in_thread_deprecation_warnings():
+ self.mgr = multiprocessing.Manager()
+ self.ns = self.mgr.Namespace()
+ self.ns.test = 0
def tearDown(self):
self.mgr.shutdown()
self.mgr.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_manager_initializer(self):
m = multiprocessing.managers.SyncManager()
self.assertRaises(TypeError, m.start, 1)
m.shutdown()
m.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_pool_initializer(self):
self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
p = multiprocessing.Pool(1, initializer, (self.ns,))
class TestStdinBadfiledescriptor(unittest.TestCase):
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_queue_in_process(self):
proc = multiprocessing.Process(target=_test_process)
proc.start()
proc.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_pool_in_process(self):
p = multiprocessing.Process(target=pool_in_process)
p.start()
p.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_flushing(self):
sio = io.StringIO()
flike = _file_like(sio)
w.send((i, os.getpid()))
w.close()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_wait(self, slow=False):
from multiprocessing.connection import wait
readers = []
s.sendall(('%s\n' % i).encode('ascii'))
s.close()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_wait_socket(self, slow=False):
from multiprocessing.connection import wait
l = socket.create_server((socket_helper.HOST, 0))
sem.release()
time.sleep(period)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_resource('walltime')
def test_wait_integer(self):
from multiprocessing.connection import wait
p.terminate()
p.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_neg_timeout(self):
from multiprocessing.connection import wait
a, b = multiprocessing.Pipe()
conn.send(456)
conn.close()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_timeout(self):
old_timeout = socket.getdefaulttimeout()
try:
conn.send(len(util._afterfork_registry))
conn.close()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_lock(self):
r, w = multiprocessing.Pipe(False)
l = util.ForkAwareThreadLock()
s.close()
conn.send(None)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_closefd(self):
if not HAS_REDUCTION:
raise unittest.SkipTest('requires fd pickling')
conn.send(x)
conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
def test_ignore(self):
conn, child_conn = multiprocessing.Pipe()
a = l.accept()
a.send('welcome')
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
def test_ignore_listener(self):
conn, child_conn = multiprocessing.Pipe()
p.join()
self.assertEqual(child_method, ctx.get_start_method())
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_context(self):
for method in ('fork', 'spawn', 'forkserver'):
try:
with self.assertRaisesRegex(TypeError, 'module_names must be a list of strings'):
ctx.set_forkserver_preload([1, 2, 3])
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_set_get(self):
multiprocessing.set_forkserver_preload(PRELOAD)
count = 0
print(err)
self.fail("failed spawning forkserver or grandchild")
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipIf(sys.platform == "win32",
"Only Spawn on windows so no risk of mixing")
@only_run_in_spawn_testsuite("avoids redundant testing.")
process.start()
process.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_nested_startmethod(self):
# gh-108520: Regression test to ensure that child process can send its
# arguments to another process
reused &= _resource_tracker._check_alive()
conn.send(reused)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_resource_tracker_reused(self):
from multiprocessing.resource_tracker import _resource_tracker
_resource_tracker.ensure_running()
with self.assertRaisesRegex(OSError, 'is closed'):
q.empty()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_empty(self):
queue = multiprocessing.SimpleQueue()
child_can_start = multiprocessing.Event()
def setUp(self):
self.manager = self.manager_class()
- self.manager.start()
+ with warnings_helper.ignore_fork_in_thread_deprecation_warnings():
+ self.manager.start()
self.proc = None
def tearDown(self):
obj.clear()
obj.wait(0.001)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_event(self):
o = self.manager.Event()
o.set()
obj.acquire()
obj.locked()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_lock(self, lname="Lock"):
o = getattr(self.manager, lname)()
self.run_worker(self._test_lock, o)
obj.release()
obj.locked()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_rlock(self, lname="RLock"):
o = getattr(self.manager, lname)()
self.run_worker(self._test_rlock, o)
def _test_semaphore(cls, obj):
obj.acquire()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_semaphore(self, sname="Semaphore"):
o = getattr(self.manager, sname)()
self.run_worker(self._test_semaphore, o)
obj.acquire()
obj.release()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_condition(self):
o = self.manager.Condition()
self.run_worker(self._test_condition, o)
assert obj.parties == 5
obj.reset()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_barrier(self):
o = self.manager.Barrier(5)
self.run_worker(self._test_barrier, o)
with obj:
pass
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_pool(self):
o = self.manager.Pool(processes=4)
self.run_worker(self._test_pool, o)
assert obj.get() == 6
assert obj.empty()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_queue(self, qname="Queue"):
o = getattr(self.manager, qname)(2)
o.put(5)
assert o.empty()
assert not o.full()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_joinable_queue(self):
self.test_queue("JoinableQueue")
obj.clear()
case.assertEqual(len(obj), 0)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_list(self):
o = self.manager.list()
o.append(5)
obj.clear()
case.assertEqual(len(obj), 0)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_dict(self):
o = self.manager.dict()
o['foo'] = 5
case.assertEqual(obj.get(), 1)
obj.set(2)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_value(self):
o = self.manager.Value('i', 1)
self.run_worker(self._test_value, o)
case.assertEqual(len(obj), 2)
case.assertListEqual(list(obj), [0, 1])
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_array(self):
o = self.manager.Array('i', [0, 1])
self.run_worker(self._test_array, o)
case.assertEqual(obj.x, 0)
case.assertEqual(obj.y, 1)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_namespace(self):
o = self.manager.Namespace()
o.x = 0
case.assertGreater(obj, {'a'})
case.assertGreaterEqual(obj, {'a', 'b'})
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_set(self):
o = self.manager.set()
self.run_worker(self._test_set_operator_symbols, o)
self.assertSetEqual(o, {"a", "b", "c"})
self.assertRaises(RemoteError, self.manager.set, 1234)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_set_contain_all_method(self):
o = self.manager.set()
set_methods = {
f.write("deadbeef")
atexit.register(exit_handler)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_atexit(self):
# gh-83856
with os_helper.temp_dir() as temp_dir:
self.assertEqual("332833500", out.decode('utf-8').strip())
self.assertFalse(err, msg=err.decode('utf-8'))
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_forked_thread_not_started(self):
# gh-134381: Ensure that a thread that has not been started yet in
# the parent process can be started within a forked child process.
@classmethod
def setUpClass(cls):
- super().setUpClass()
- cls.manager = multiprocessing.Manager()
+ with warnings_helper.ignore_fork_in_thread_deprecation_warnings():
+ super().setUpClass()
+ cls.manager = multiprocessing.Manager()
@classmethod
def tearDownClass(cls):
name = f'test_semlock_subclass-{os.getpid()}'
s = SemLock(1, 0, 10, name, False)
_multiprocessing.sem_unlink(name)
+
+
+@unittest.skipIf(sys.platform != "linux", "Linux only")
+class ForkInThreads(unittest.TestCase):
+
+ def test_fork(self):
+ code = """
+ import os, sys, threading, time
+
+ t = threading.Thread(target=time.sleep, args=(1,), daemon=True)
+ t.start()
+
+ assert threading.active_count() == 2
+
+ pid = os.fork()
+ if pid < 0:
+ print("Fork failed")
+ elif pid == 0:
+ print("In child")
+ sys.exit(0)
+ print("In parent")
+ """
+
+ res = assert_python_ok("-c", code, PYTHONWARNINGS='always')
+ self.assertIn(b'In child', res.out)
+ self.assertIn(b'In parent', res.out)
+ self.assertIn(b'DeprecationWarning', res.err)
+ self.assertIn(b'is multi-threaded, use of fork() may lead to deadlocks in the child', res.err)
+
+ res = assert_python_failure("-c", code, PYTHONWARNINGS='error')
+ self.assertIn(b'DeprecationWarning', res.err)
+ self.assertIn(b'is multi-threaded, use of fork() may lead to deadlocks in the child', res.err)
+
+ def test_forkpty(self):
+ code = """
+ import os, sys, threading, time
+
+ t = threading.Thread(target=time.sleep, args=(1,), daemon=True)
+ t.start()
+
+ assert threading.active_count() == 2
+
+ pid, _ = os.forkpty()
+ if pid < 0:
+ print(f"forkpty failed")
+ elif pid == 0:
+ print(f"In child")
+ sys.exit(0)
+ print(f"In parent")
+ """
+
+ res = assert_python_ok("-c", code, PYTHONWARNINGS='always')
+ self.assertIn(b'In parent', res.out)
+ self.assertIn(b'DeprecationWarning', res.err)
+ self.assertIn(b'is multi-threaded, use of forkpty() may lead to deadlocks in the child', res.err)
+
+ res = assert_python_failure("-c", code, PYTHONWARNINGS='error')
+ self.assertIn(b'DeprecationWarning', res.err)
+ self.assertIn(b'is multi-threaded, use of forkpty() may lead to deadlocks in the child', res.err)
import contextlib
-import functools
import importlib
import re
import sys
import warnings
+
def import_deprecated(name):
"""Import *name* while suppressing DeprecationWarning."""
with warnings.catch_warnings():
testcase.assertEqual(warns, [])
-def ignore_warnings(*, category):
+@contextlib.contextmanager
+def ignore_warnings(*, category, message=''):
"""Decorator to suppress warnings.
- Use of context managers to hide warnings make diffs
- more noisy and tools like 'git blame' less useful.
+ Can also be used as a context manager. This is not preferred,
+ because it makes diffs more noisy and tools like 'git blame' less useful.
+ But, it's useful for async functions.
"""
- def decorator(test):
- @functools.wraps(test)
- def wrapper(self, *args, **kwargs):
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', category=category)
- return test(self, *args, **kwargs)
- return wrapper
- return decorator
+ with warnings.catch_warnings():
+ warnings.filterwarnings('ignore', category=category, message=message)
+ yield
+
+
+@contextlib.contextmanager
+def ignore_fork_in_thread_deprecation_warnings():
+ """Suppress deprecation warnings related to forking in multi-threaded code.
+
+ See gh-135427
+
+ Can be used as decorator (preferred) or context manager.
+ """
+ with ignore_warnings(
+ message=".*fork.*may lead to deadlocks in the child.*",
+ category=DeprecationWarning,
+ ):
+ yield
class WarningsRecorder(object):
from unittest import mock
from test import support
-from test.support import os_helper
+from test.support import os_helper, warnings_helper
from test.support import socket_helper
from test.support import wait_process
from test.support import hashlib_helper
class TestFork(unittest.IsolatedAsyncioTestCase):
async def test_fork_not_share_event_loop(self):
- # The forked process should not share the event loop with the parent
- loop = asyncio.get_running_loop()
- r, w = os.pipe()
- self.addCleanup(os.close, r)
- self.addCleanup(os.close, w)
- pid = os.fork()
- if pid == 0:
- # child
- try:
- loop = asyncio.get_event_loop()
- os.write(w, b'LOOP:' + str(id(loop)).encode())
- except RuntimeError:
- os.write(w, b'NO LOOP')
- except BaseException as e:
- os.write(w, b'ERROR:' + ascii(e).encode())
- finally:
- os._exit(0)
- else:
- # parent
- result = os.read(r, 100)
- self.assertEqual(result, b'NO LOOP')
- wait_process(pid, exitcode=0)
-
+ with warnings_helper.ignore_fork_in_thread_deprecation_warnings():
+ # The forked process should not share the event loop with the parent
+ loop = asyncio.get_running_loop()
+ r, w = os.pipe()
+ self.addCleanup(os.close, r)
+ self.addCleanup(os.close, w)
+ pid = os.fork()
+ if pid == 0:
+ # child
+ try:
+ loop = asyncio.get_event_loop()
+ os.write(w, b'LOOP:' + str(id(loop)).encode())
+ except RuntimeError:
+ os.write(w, b'NO LOOP')
+ except BaseException as e:
+ os.write(w, b'ERROR:' + ascii(e).encode())
+ finally:
+ os._exit(0)
+ else:
+ # parent
+ result = os.read(r, 100)
+ self.assertEqual(result, b'NO LOOP')
+ wait_process(pid, exitcode=0)
+
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@hashlib_helper.requires_hashdigest('md5')
@support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True)
def test_fork_signal_handling(self):
self.assertFalse(parent_handled.is_set())
self.assertTrue(child_handled.is_set())
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@hashlib_helper.requires_hashdigest('md5')
@support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True)
def test_fork_asyncio_run(self):
self.assertEqual(result.value, 42)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@hashlib_helper.requires_hashdigest('md5')
@support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True)
def test_fork_asyncio_subprocess(self):
from test import support
from test.support import cpython_only, swap_attr
from test.support import async_yield, run_yielding_async_fn
+from test.support import warnings_helper
from test.support.import_helper import import_module
from test.support.os_helper import (EnvironmentVarGuard, TESTFN, unlink)
from test.support.script_helper import assert_python_ok
finally:
signal.signal(signal.SIGHUP, old_sighup)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def _run_child(self, child, terminal_input):
r, w = os.pipe() # Pipe test results from child back to parent
try:
from concurrent import futures
from operator import add
from test import support
-from test.support import Py_GIL_DISABLED
+from test.support import Py_GIL_DISABLED, warnings_helper
def mul(x, y):
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_submit(self):
future = self.executor.submit(pow, 2, 8)
self.assertEqual(256, future.result())
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_submit_keyword(self):
future = self.executor.submit(mul, 2, y=8)
self.assertEqual(16, future.result())
with self.assertRaises(TypeError):
self.executor.submit(arg=1)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map(self):
self.assertEqual(
list(self.executor.map(pow, range(10), range(10))),
list(self.executor.map(pow, range(10), range(10), chunksize=3)),
list(map(pow, range(10), range(10))))
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_exception(self):
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(i.__next__(), (0, 1))
with self.assertRaises(ZeroDivisionError):
i.__next__()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_resource('walltime')
def test_map_timeout(self):
results = []
):
self.executor.map(str, range(4), buffersize=buffersize)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize(self):
ints = range(4)
for buffersize in (1, 2, len(ints), len(ints) * 2):
res = self.executor.map(str, ints, buffersize=buffersize)
self.assertListEqual(list(res), ["0", "1", "2", "3"])
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize_on_multiple_iterables(self):
ints = range(4)
for buffersize in (1, 2, len(ints), len(ints) * 2):
res = self.executor.map(add, ints, ints, buffersize=buffersize)
self.assertListEqual(list(res), [0, 2, 4, 6])
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize_on_infinite_iterable(self):
res = self.executor.map(str, itertools.count(), buffersize=2)
self.assertEqual(next(res, None), "0")
self.assertEqual(next(res, None), "1")
self.assertEqual(next(res, None), "2")
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize_on_multiple_infinite_iterables(self):
res = self.executor.map(
add,
res = self.executor.map(str, buffersize=2)
self.assertIsNone(next(res, None))
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_buffersize_when_buffer_is_full(self):
ints = iter(range(4))
buffersize = 2
msg="should have fetched only `buffersize` elements from `ints`.",
)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_shutdown_race_issue12456(self):
# Issue #12456: race condition at shutdown where trying to post a
# sentinel in the call queue blocks (the queue is full while processes
self.executor.map(str, [2] * (self.worker_count + 1))
self.executor.shutdown()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.cpython_only
def test_no_stale_references(self):
# Issue #16284: check that the executors don't unnecessarily hang onto
"than 0"):
self.executor_type(max_workers=number)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_free_reference(self):
# Issue #14406: Result iterator should not keep an internal
# reference to result objects.
if wr() is None:
break
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_swallows_falsey_exceptions(self):
# see gh-132063: Prevent exceptions that evaluate as falsey
# from being ignored.
CANCELLED_AND_NOTIFIED, FINISHED, Future)
from test import support
+from test.support import warnings_helper
from .util import (
PENDING_FUTURE, RUNNING_FUTURE,
class AsCompletedTests:
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_no_timeout(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(mul, 7, 6)
future1, future2]),
completed)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_future_times_out(self):
"""Test ``futures.as_completed`` timing out before
completing it's final future."""
# Check that ``future`` wasn't completed.
self.assertEqual(completed_futures, already_completed)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_duplicate_futures(self):
# Issue 20367. Duplicate futures should not raise exceptions or give
# duplicate responses.
from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
from test import support
+from test.support import warnings_helper
from .util import (
create_executor_tests, setup_module,
print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
self.fail(f"Executor deadlock:\n\n{tb}")
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def _check_error(self, error, func, *args, ignore_stderr=False):
# test for deadlock caused by crashes or exiting in a pool
self.executor.shutdown(wait=True)
# the result_handler thread
self._check_error(BrokenProcessPool, _return_instance, ExitAtUnpickle)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.skip_if_sanitizer("UBSan: explicit SIGSEV not allowed", ub=True)
def test_shutdown_deadlock(self):
# Test that the pool calling shutdown do not cause deadlock
with self.assertRaises(BrokenProcessPool):
f.result()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_shutdown_deadlock_pickle(self):
# Test that the pool calling shutdown with wait=False does not cause
# a deadlock if a task fails at pickle after the shutdown call.
# dangling threads
executor_manager.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.skip_if_sanitizer("UBSan: explicit SIGSEV not allowed", ub=True)
def test_crash_big_data(self):
# Test that there is a clean exception instead of a deadlock when a
executor.shutdown(wait=True)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
# Issue #105829: The _ExecutorManagerThread wakeup pipe could
# fill up and block. See: https://github.com/python/cpython/issues/105829
from logging.handlers import QueueHandler
from test import support
+from test.support import warnings_helper
from .util import ExecutorMixin, create_executor_tests, setup_module
initargs=('initialized',))
super().setUp()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_initializer(self):
futures = [self.executor.submit(get_init_status)
for _ in range(self.worker_count)]
self.executor_kwargs = dict(initializer=init_fail)
super().setUp()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_initializer(self):
with self._assert_logged('ValueError: error in initializer'):
try:
from concurrent.futures.process import BrokenProcessPool
from test import support
-from test.support import hashlib_helper
+from test.support import hashlib_helper, warnings_helper
from test.test_importlib.metadata.fixtures import parameterize
from .executor import ExecutorTest, mul
"max_workers must be <= 61"):
futures.ProcessPoolExecutor(max_workers=62)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_killed_child(self):
# When a child process is abruptly terminated, the whole pool gets
# "broken".
# Submitting other jobs fails as well.
self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_map_chunksize(self):
def bad_map():
list(self.executor.map(pow, range(40), range(40), chunksize=-1))
def _test_traceback(cls):
raise RuntimeError(123) # some comment
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_traceback(self):
# We want ensure that the traceback from the child process is
# contained in the traceback raised in the main process.
self.assertIn('raise RuntimeError(123) # some comment',
f1.getvalue())
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@hashlib_helper.requires_hashdigest('md5')
def test_ressources_gced_in_workers(self):
# Ensure that argument for a job are correctly gc-ed after the job
mgr.shutdown()
mgr.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_saturation(self):
executor = self.executor
mp_context = self.get_context()
for i, future in enumerate(futures):
self.assertEqual(future.result(), mul(i, i))
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_python_finalization_error(self):
# gh-109047: Catch RuntimeError on thread creation
# during Python finalization.
executor._force_shutdown,
operation='invalid operation'),
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@parameterize(*FORCE_SHUTDOWN_PARAMS)
def test_force_shutdown_workers(self, function_name):
manager = self.get_context().Manager()
from concurrent import futures
from test import support
+from test.support import warnings_helper
from test.support.script_helper import assert_python_ok
from .util import (
self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
self.assertEqual(out.strip(), b"runtime-error")
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_hang_issue12364(self):
fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
self.executor.shutdown()
for f in fs:
f.result()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_cancel_futures(self):
assert self.worker_count <= 5, "test needs few workers"
fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
self.assertFalse(err)
self.assertEqual(out.strip(), b"apple")
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_hang_gh94440(self):
"""shutdown(wait=True) doesn't hang when a future was submitted and
quickly canceled right before shutdown.
for t in self.executor._threads:
t.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_context_manager_shutdown(self):
with futures.ThreadPoolExecutor(max_workers=5) as e:
executor = e
for t in executor._threads:
t.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_del_shutdown(self):
executor = futures.ThreadPoolExecutor(max_workers=5)
res = executor.map(abs, range(-5, 5))
# executor got shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_shutdown_no_wait(self):
# Ensure that the executor cleans up the threads when calling
# shutdown with wait=False
# executor got shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
-
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_thread_names_assigned(self):
executor = futures.ThreadPoolExecutor(
max_workers=5, thread_name_prefix='SpecialPool')
self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
t.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_thread_names_default(self):
executor = futures.ThreadPoolExecutor(max_workers=5)
executor.map(abs, range(-5, 5))
class ProcessPoolShutdownTest(ExecutorShutdownTest):
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_processes_terminate(self):
def acquire_lock(lock):
lock.acquire()
for p in processes.values():
p.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_context_manager_shutdown(self):
with futures.ProcessPoolExecutor(
max_workers=5, mp_context=self.get_context()) as e:
for p in processes.values():
p.join()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_del_shutdown(self):
executor = futures.ProcessPoolExecutor(
max_workers=5, mp_context=self.get_context())
# executor got shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_shutdown_no_wait(self):
# Ensure that the executor cleans up the processes when calling
# shutdown with wait=False
import unittest
from concurrent import futures
from test import support
+from test.support import warnings_helper
from .executor import ExecutorTest, mul
from .util import BaseTestCase, ThreadPoolMixin, setup_module
@support.requires_fork()
@unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
@support.requires_resource('cpu')
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_hang_global_shutdown_lock(self):
# bpo-45021: _global_shutdown_lock should be reinitialized in the child
# process, otherwise it will never exit
@support.requires_fork()
@unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_process_fork_from_a_threadpool(self):
# bpo-43944: clear concurrent.futures.thread._threads_queues after fork,
# otherwise child process will try to join parent thread
import unittest
from concurrent import futures
from test import support
-from test.support import threading_helper
+from test.support import threading_helper, warnings_helper
from .util import (
CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
class WaitTests:
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_20369(self):
# See https://bugs.python.org/issue20369
future = self.executor.submit(mul, 1, 2)
self.assertEqual({future}, done)
self.assertEqual(set(), not_done)
-
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_first_completed(self):
event = self.create_event()
future1 = self.executor.submit(mul, 21, 2)
event.set()
future2.result() # wait for job to finish
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_first_completed_some_already_completed(self):
event = self.create_event()
future1 = self.executor.submit(event.wait)
event.set()
future1.result() # wait for job to finish
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_first_exception(self):
event1 = self.create_event()
event2 = self.create_event()
event2.set()
future3.result() # wait for job to finish
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_first_exception_some_already_complete(self):
event = self.create_event()
future1 = self.executor.submit(divmod, 21, 0)
event.set()
future2.result() # wait for job to finish
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_first_exception_one_already_failed(self):
event = self.create_event()
future1 = self.executor.submit(event.wait)
event.set()
future1.result() # wait for job to finish
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
future2]), finished)
self.assertEqual(set(), pending)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_timeout(self):
short_timeout = 0.050
from concurrent.futures.process import _check_system_limits
from test import support
-from test.support import threading_helper
+from test.support import threading_helper, warnings_helper
def create_future(state=PENDING, exception=None, result=None):
max_workers=self.worker_count,
mp_context=self.get_context(),
**self.executor_kwargs)
- self.manager = self.get_context().Manager()
+ with warnings_helper.ignore_fork_in_thread_deprecation_warnings():
+ self.manager = self.get_context().Manager()
else:
self.executor = self.executor_type(
max_workers=self.worker_count,
from test.fork_wait import ForkWait
from test import support
+from test.support import warnings_helper
# Skip test if fork does not exist.
class ForkTest(ForkWait):
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_threaded_import_lock_fork(self):
"""Check fork() in main thread works while a subthread is doing an import"""
import_started = threading.Event()
except OSError:
pass
-
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_nested_import_lock_fork(self):
"""Check fork() in main thread works while the main thread is doing an import"""
exitcode = 42
import time
import unittest
+from test.support import warnings_helper
+
if not hasattr(select, "kqueue"):
raise unittest.SkipTest("test works only on BSD")
self.addCleanup(kqueue.close)
self.assertEqual(os.get_inheritable(kqueue.fileno()), False)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_fork()
def test_fork(self):
# gh-110395: kqueue objects must be closed after fork
# based on os.fork existing because that is what users and this test use.
# This helps ensure that when fork exists (the important concept) that the
# register_at_fork mechanism is also present and used.
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_fork()
@threading_helper.requires_working_threading()
@skip_if_asan_fork
self._apply_simple_queue_listener_configuration(qspec)
manager.assert_not_called()
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@skip_if_tsan_fork
@support.requires_subprocess()
@unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing"
with self.assertRaises(ValueError):
self._apply_simple_queue_listener_configuration(qspec)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@skip_if_tsan_fork
@support.requires_subprocess()
@unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing"
# log a message (this creates a record put in the queue)
logging.getLogger().info(message_to_log)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@skip_if_tsan_fork
@support.requires_subprocess()
def test_multiprocessing_queues(self):
else:
return results
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@skip_if_tsan_fork
def test_multiprocessing(self):
support.skip_if_broken_multiprocessing_synchronize()
import io
import tempfile
from test import support
-from test.support import import_helper
+from test.support import import_helper, warnings_helper
from test.support import os_helper
from test.support import refleak_helper
from test.support import socket_helper
self.assertEqual(contents, f.read())
self._box = self._factory(self._path)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_fork()
@unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().")
def test_lock_conflict(self):
self.assertEqual(error, b'')
self.assertEqual(int(stdout), os.getpid())
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def check_waitpid(self, code, exitcode, callback=None):
if sys.platform == 'win32':
# On Windows, os.spawnv() simply joins arguments with spaces:
return program, args
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnl')
def test_spawnl(self):
program, args = self.create_args()
exitcode = os.spawnl(os.P_WAIT, program, *args)
self.assertEqual(exitcode, self.exitcode)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnle')
def test_spawnle(self):
program, args = self.create_args(with_env=True)
exitcode = os.spawnle(os.P_WAIT, program, *args, self.env)
self.assertEqual(exitcode, self.exitcode)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnlp')
def test_spawnlp(self):
program, args = self.create_args()
exitcode = os.spawnlp(os.P_WAIT, program, *args)
self.assertEqual(exitcode, self.exitcode)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnlpe')
def test_spawnlpe(self):
program, args = self.create_args(with_env=True)
exitcode = os.spawnlpe(os.P_WAIT, program, *args, self.env)
self.assertEqual(exitcode, self.exitcode)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnv')
def test_spawnv(self):
program, args = self.create_args()
exitcode = os.spawnv(os.P_WAIT, FakePath(program), args)
self.assertEqual(exitcode, self.exitcode)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnve')
def test_spawnve(self):
program, args = self.create_args(with_env=True)
exitcode = os.spawnve(os.P_WAIT, program, args, self.env)
self.assertEqual(exitcode, self.exitcode)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnvp')
def test_spawnvp(self):
program, args = self.create_args()
exitcode = os.spawnvp(os.P_WAIT, program, args)
self.assertEqual(exitcode, self.exitcode)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnvpe')
def test_spawnvpe(self):
program, args = self.create_args(with_env=True)
exitcode = os.spawnvpe(os.P_WAIT, program, args, self.env)
self.assertEqual(exitcode, self.exitcode)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnv')
def test_nowait(self):
program, args = self.create_args()
pid = os.spawnv(os.P_NOWAIT, program, args)
support.wait_process(pid, exitcode=self.exitcode)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnve')
def test_spawnve_bytes(self):
# Test bytes handling in parse_arglist and parse_envlist (#28114)
exitcode = os.spawnve(os.P_WAIT, program, args, self.env)
self.assertEqual(exitcode, self.exitcode)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnl')
def test_spawnl_noargs(self):
program, __ = self.create_args()
self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, program)
self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, program, '')
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnle')
def test_spawnle_noargs(self):
program, __ = self.create_args()
self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, program, {})
self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, program, '', {})
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnv')
def test_spawnv_noargs(self):
program, __ = self.create_args()
self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, ('',))
self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, program, [''])
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnve')
def test_spawnve_noargs(self):
program, __ = self.create_args()
exitcode = spawn(os.P_WAIT, program, args, newenv)
self.assertEqual(exitcode, 0)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnve')
def test_spawnve_invalid_env(self):
self._test_invalid_env(os.spawnve)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_os_func('spawnvpe')
def test_spawnvpe_invalid_env(self):
self._test_invalid_env(os.spawnvpe)
self.addCleanup(os.close, son_fd)
self.assertEqual(os.ptsname(mother_fd), os.ttyname(son_fd))
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(hasattr(os, 'spawnl'), "need os.spawnl()")
@support.requires_subprocess()
def test_pipe_spawnl(self):
from unittest import mock
from test import support
-from test.support import os_helper
+from test.support import os_helper, warnings_helper
try:
# Some of the iOS tests need ctypes to operate.
else:
self.assertEqual(res[2], 'PowerPC')
-
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(sys.platform == 'darwin', "OSX only test")
def test_mac_ver_with_fork(self):
# Issue7895: platform.mac_ver() crashes when using fork without exec
import unittest
from test.support import (
- is_android, is_apple_mobile, is_wasm32, reap_children, verbose
+ is_android, is_apple_mobile, is_wasm32, reap_children, verbose, warnings_helper
)
from test.support.import_helper import import_module
from test.support.os_helper import TESTFN, unlink
s2 = _readline(master_fd)
self.assertEqual(b'For my pet fish, Eric.\n', normalize_output(s2))
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_fork(self):
debug("calling pty.fork()")
pid, master_fd = pty.fork()
self.assertEqual(data, b"")
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_spawn_doesnt_hang(self):
self.addCleanup(unlink, TESTFN)
with open(TESTFN, 'wb') as f:
from fractions import Fraction
from collections import abc, Counter
+from test.support import warnings_helper
+
class MyIndex:
def __init__(self, value):
# tests validity but not completeness of the __all__ list
self.assertTrue(set(random.__all__) <= set(dir(random)))
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@test.support.requires_fork()
def test_after_fork(self):
# Test the global Random instance gets reseeded in child
from test.support import os_helper
from test.support import socket_helper
from test.support import threading_helper
+from test.support import warnings_helper
test.support.requires("network")
raise RuntimeError("timed out on %r" % (sock,))
+@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@test.support.requires_fork()
@contextlib.contextmanager
def simple_subprocess(testcase):
socketserver.StreamRequestHandler,
self.stream_examine)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_forking
def test_ForkingTCPServer(self):
with simple_subprocess(self):
socketserver.StreamRequestHandler,
self.stream_examine)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_unix_sockets
@requires_forking
def test_ForkingUnixStreamServer(self):
socketserver.DatagramRequestHandler,
self.dgram_examine)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_forking
def test_ForkingUDPServer(self):
with simple_subprocess(self):
socketserver.DatagramRequestHandler,
self.dgram_examine)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_unix_sockets
@requires_forking
def test_ForkingUnixDatagramServer(self):
self.assertIs(cm.exc_type, SystemExit)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_forking
def test_forking_handled(self):
ForkingErrorTestServer(ValueError)
self.check_result(handled=True)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@requires_forking
def test_forking_not_handled(self):
ForkingErrorTestServer(SystemExit)
self.assertRaises(AssertionError, support.check__all__, self, unittest)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@unittest.skipUnless(hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG'),
'need os.waitpid() and os.WNOHANG')
@support.requires_fork()
from test.support.script_helper import (assert_python_ok, assert_python_failure,
interpreter_requires_environment)
from test import support
-from test.support import force_not_colorized
+from test.support import force_not_colorized, warnings_helper
from test.support import os_helper
from test.support import threading_helper
# everything is fine
return 0
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_fork()
def test_fork(self):
# check that tracemalloc is still working after fork
from unittest import mock
from test import support
-from test.support import import_helper
+from test.support import import_helper, warnings_helper
from test.support.script_helper import assert_python_ok
py_uuid = import_helper.import_fresh_module('uuid', blocked=['_uuid'])
versions = {u.version for u in uuids}
self.assertSetEqual(versions, {8})
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
@support.requires_fork()
def testIssue8621(self):
# On at least some versions of OSX self.uuid.uuid4 generates
--- /dev/null
+With :option:`-Werror <-W>`, the DeprecationWarning emitted by :py:func:`os.fork`
+and :py:func:`os.forkpty` in mutli-threaded processes is now raised as an exception.
+Previously it was silently ignored.
+Patch by Rani Pinchuk.
//
// This should only be called from the parent process after
// PyOS_AfterFork_Parent().
-static void
+static int
warn_about_fork_with_threads(const char* name)
{
// It's not safe to issue the warning while the world is stopped, because
PyObject *threading = PyImport_GetModule(&_Py_ID(threading));
if (!threading) {
PyErr_Clear();
- return;
+ return 0;
}
PyObject *threading_active =
PyObject_GetAttr(threading, &_Py_ID(_active));
if (!threading_active) {
PyErr_Clear();
Py_DECREF(threading);
- return;
+ return 0;
}
PyObject *threading_limbo =
PyObject_GetAttr(threading, &_Py_ID(_limbo));
PyErr_Clear();
Py_DECREF(threading);
Py_DECREF(threading_active);
- return;
+ return 0;
}
Py_DECREF(threading);
// Duplicating what threading.active_count() does but without holding
Py_DECREF(threading_limbo);
}
if (num_python_threads > 1) {
- PyErr_WarnFormat(
+ return PyErr_WarnFormat(
PyExc_DeprecationWarning, 1,
#ifdef HAVE_GETPID
"This process (pid=%d) is multi-threaded, "
getpid(),
#endif
name);
- PyErr_Clear();
}
+ return 0;
}
#endif // HAVE_FORK1 || HAVE_FORKPTY || HAVE_FORK
/* parent: release the import lock. */
PyOS_AfterFork_Parent();
// After PyOS_AfterFork_Parent() starts the world to avoid deadlock.
- warn_about_fork_with_threads("fork1");
+ if (warn_about_fork_with_threads("fork1") < 0) {
+ return NULL;
+ }
}
if (pid == -1) {
errno = saved_errno;
/* parent: release the import lock. */
PyOS_AfterFork_Parent();
// After PyOS_AfterFork_Parent() starts the world to avoid deadlock.
- warn_about_fork_with_threads("fork");
+ if (warn_about_fork_with_threads("fork") < 0)
+ return NULL;
}
if (pid == -1) {
errno = saved_errno;
/* parent: release the import lock. */
PyOS_AfterFork_Parent();
// After PyOS_AfterFork_Parent() starts the world to avoid deadlock.
- warn_about_fork_with_threads("forkpty");
+ if (warn_about_fork_with_threads("forkpty") < 0)
+ return NULL;
}
if (pid == -1) {
return posix_error();