From: Gregory P. Smith <68491+gpshead@users.noreply.github.com> Date: Sun, 12 Apr 2026 06:06:19 +0000 (-0700) Subject: gh-146313: Fix multiprocessing ResourceTracker deadlock after os.fork() (GH-146316) X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=3a7df632c96eb6c5de12fac08d1da42df9e25334;p=thirdparty%2FPython%2Fcpython.git gh-146313: Fix multiprocessing ResourceTracker deadlock after os.fork() (GH-146316) `ResourceTracker.__del__` (added in gh-88887 circa Python 3.12) calls os.waitpid(pid, 0) which blocks indefinitely if a process created via os.fork() still holds the tracker pipe's write end. The tracker never sees EOF, never exits, and the parent hangs at interpreter shutdown. Fix with two layers: - **At-fork handler.** An os.register_at_fork(after_in_child=...) handler closes the inherited pipe fd in the child unless a preserve flag is set. popen_fork.Popen._launch() sets the flag before its fork so mp.Process(fork) children keep the fd and reuse the parent's tracker (preserving gh-80849). Raw os.fork() children close the fd, letting the parent reap promptly. - **Timeout safety-net.** _stop_locked() gains a wait_timeout parameter. When called from `__del__`, it polls with WNOHANG using exponential backoff for up to 1 second instead of blocking indefinitely. The at-fork handler makes this unreachable in well-behaved paths; it remains for abnormal shutdowns. Co-authored-by: Itamar Oren --- diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py index 7affa1b985f0..a02a53b6a176 100644 --- a/Lib/multiprocessing/popen_fork.py +++ b/Lib/multiprocessing/popen_fork.py @@ -67,7 +67,17 @@ class Popen(object): code = 1 parent_r, child_w = os.pipe() child_r, parent_w = os.pipe() - self.pid = os.fork() + # gh-146313: Tell the resource tracker's at-fork handler to keep + # the inherited pipe fd so this child reuses the parent's tracker + # (gh-80849) rather than closing it and launching its own. + from .resource_tracker import _fork_intent + _fork_intent.preserve_fd = True + try: + self.pid = os.fork() + finally: + # Reset in both parent and child so the flag does not leak + # into a subsequent raw os.fork() or nested Process launch. + _fork_intent.preserve_fd = False if self.pid == 0: try: atexit._clear() diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 3606d1effb49..d3328a8c6170 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -20,6 +20,7 @@ import os import signal import sys import threading +import time import warnings from collections import deque @@ -75,6 +76,10 @@ class ResourceTracker(object): # The reader should understand all formats. self._use_simple_format = False + # Set to True by _stop_locked() if the waitpid polling loop ran to + # its timeout without reaping the tracker. Exposed for tests. + self._waitpid_timed_out = False + def _reentrant_call_error(self): # gh-109629: this happens if an explicit call to the ResourceTracker # gets interrupted by a garbage collection, invoking a finalizer (*) @@ -87,16 +92,51 @@ class ResourceTracker(object): # making sure child processess are cleaned before ResourceTracker # gets destructed. # see https://github.com/python/cpython/issues/88887 - self._stop(use_blocking_lock=False) + # gh-146313: use a timeout to avoid deadlocking if a forked child + # still holds the pipe's write end open. + self._stop(use_blocking_lock=False, wait_timeout=1.0) + + def _after_fork_in_child(self): + # gh-146313: Called in the child right after os.fork(). + # + # The tracker process is a child of the *parent*, not of us, so we + # could never waitpid() it anyway. Clearing _pid means our __del__ + # becomes a no-op (the early return for _pid is None). + # + # Whether we keep the inherited _fd depends on who forked us: + # + # - multiprocessing.Process with the 'fork' start method sets + # _fork_intent.preserve_fd before forking. The child keeps the + # fd and reuses the parent's tracker (gh-80849). This is safe + # because multiprocessing's atexit handler joins all children + # before the parent's __del__ runs, so by then the fd copies + # are gone and the parent can reap the tracker promptly. + # + # - A raw os.fork() leaves the flag unset. We close the fd in the child after forking so + # the parent's __del__ can reap the tracker without waiting + # for the child to exit. If we later need a tracker, ensure_running() + # will launch a fresh one. + self._lock._at_fork_reinit() + self._reentrant_messages.clear() + self._pid = None + self._exitcode = None + if (self._fd is not None and + not getattr(_fork_intent, 'preserve_fd', False)): + fd = self._fd + self._fd = None + try: + os.close(fd) + except OSError: + pass - def _stop(self, use_blocking_lock=True): + def _stop(self, use_blocking_lock=True, wait_timeout=None): if use_blocking_lock: with self._lock: - self._stop_locked() + self._stop_locked(wait_timeout=wait_timeout) else: acquired = self._lock.acquire(blocking=False) try: - self._stop_locked() + self._stop_locked(wait_timeout=wait_timeout) finally: if acquired: self._lock.release() @@ -106,6 +146,10 @@ class ResourceTracker(object): close=os.close, waitpid=os.waitpid, waitstatus_to_exitcode=os.waitstatus_to_exitcode, + monotonic=time.monotonic, + sleep=time.sleep, + WNOHANG=getattr(os, 'WNOHANG', None), + wait_timeout=None, ): # This shouldn't happen (it might when called by a finalizer) # so we check for it anyway. @@ -122,7 +166,30 @@ class ResourceTracker(object): self._fd = None try: - _, status = waitpid(self._pid, 0) + if wait_timeout is None: + _, status = waitpid(self._pid, 0) + else: + # gh-146313: A forked child may still hold the pipe's write + # end open, preventing the tracker from seeing EOF and + # exiting. Poll with WNOHANG to avoid blocking forever. + deadline = monotonic() + wait_timeout + delay = 0.001 + while True: + result_pid, status = waitpid(self._pid, WNOHANG) + if result_pid != 0: + break + remaining = deadline - monotonic() + if remaining <= 0: + # The tracker is still running; it will be + # reparented to PID 1 (or the nearest subreaper) + # when we exit, and reaped there once all pipe + # holders release their fd. + self._pid = None + self._exitcode = None + self._waitpid_timed_out = True + return + delay = min(delay * 2, remaining, 0.1) + sleep(delay) except ChildProcessError: self._pid = None self._exitcode = None @@ -308,12 +375,24 @@ class ResourceTracker(object): self._ensure_running_and_write(msg) +# gh-146313: Per-thread flag set by .popen_fork.Popen._launch() just before +# os.fork(), telling _after_fork_in_child() to keep the inherited pipe fd so +# the child can reuse this tracker (gh-80849). Unset for raw os.fork() calls, +# where the child instead closes the fd so the parent's __del__ can reap the +# tracker. Using threading.local() keeps multiple threads calling +# popen_fork.Popen._launch() at once from clobbering eachothers intent. +_fork_intent = threading.local() + _resource_tracker = ResourceTracker() ensure_running = _resource_tracker.ensure_running register = _resource_tracker.register unregister = _resource_tracker.unregister getfd = _resource_tracker.getfd +# gh-146313: See _after_fork_in_child docstring. +if hasattr(os, 'register_at_fork'): + os.register_at_fork(after_in_child=_resource_tracker._after_fork_in_child) + def _decode_message(line): if line.startswith(b'{'): diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 69174cff6991..580d9f2b3254 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -6321,8 +6321,9 @@ class TestResourceTracker(unittest.TestCase): def _is_resource_tracker_reused(conn, pid): from multiprocessing.resource_tracker import _resource_tracker _resource_tracker.ensure_running() - # The pid should be None in the child process, expect for the fork - # context. It should not be a new value. + # The pid should be None in the child (the at-fork handler clears + # it for fork; spawn/forkserver children never had it set). It + # should not be a new value. reused = _resource_tracker._pid in (None, pid) reused &= _resource_tracker._check_alive() conn.send(reused) @@ -6408,6 +6409,183 @@ class TestResourceTracker(unittest.TestCase): # restore sigmask to what it was before executing test signal.pthread_sigmask(signal.SIG_SETMASK, orig_sigmask) + @only_run_in_forkserver_testsuite("avoids redundant testing.") + def test_resource_tracker_fork_deadlock(self): + # gh-146313: ResourceTracker.__del__ used to deadlock if a forked + # child still held the pipe's write end open when the parent + # exited, because the parent would block in waitpid() waiting for + # the tracker to exit, but the tracker would never see EOF. + cmd = '''if 1: + import os, signal + from multiprocessing.resource_tracker import ensure_running + ensure_running() + if os.fork() == 0: + signal.pause() + os._exit(0) + # parent falls through and exits, triggering __del__ + ''' + proc = subprocess.Popen([sys.executable, '-c', cmd], + start_new_session=True) + try: + try: + proc.wait(timeout=support.SHORT_TIMEOUT) + except subprocess.TimeoutExpired: + self.fail( + "Parent process deadlocked in ResourceTracker.__del__" + ) + self.assertEqual(proc.returncode, 0) + finally: + try: + os.killpg(proc.pid, signal.SIGKILL) + except ProcessLookupError: + pass + proc.wait() + + @only_run_in_forkserver_testsuite("avoids redundant testing.") + def test_resource_tracker_mp_fork_reuse_and_prompt_reap(self): + # gh-146313 / gh-80849: A child started via multiprocessing.Process + # with the 'fork' start method should reuse the parent's resource + # tracker (the at-fork handler preserves the inherited pipe fd), + # *and* the parent should be able to reap the tracker promptly + # after joining the child, without hitting the waitpid timeout. + cmd = textwrap.dedent(''' + import multiprocessing as mp + from multiprocessing.resource_tracker import _resource_tracker + + def child(conn): + # Prove we can talk to the parent's tracker by registering + # and unregistering a dummy resource over the inherited fd. + # If the fd were closed, ensure_running would launch a new + # tracker and _pid would be non-None. + _resource_tracker.register("x", "dummy") + _resource_tracker.unregister("x", "dummy") + conn.send((_resource_tracker._fd is not None, + _resource_tracker._pid is None, + _resource_tracker._check_alive())) + + if __name__ == "__main__": + mp.set_start_method("fork") + _resource_tracker.ensure_running() + r, w = mp.Pipe(duplex=False) + p = mp.Process(target=child, args=(w,)) + p.start() + child_has_fd, child_pid_none, child_alive = r.recv() + p.join() + w.close(); r.close() + + # Now simulate __del__: the child has exited and released + # its fd copy, so the tracker should see EOF and exit + # promptly -- no timeout. + _resource_tracker._stop(wait_timeout=5.0) + print(child_has_fd, child_pid_none, child_alive, + _resource_tracker._waitpid_timed_out, + _resource_tracker._exitcode) + ''') + rc, out, err = script_helper.assert_python_ok('-c', cmd) + parts = out.decode().split() + self.assertEqual(parts, ['True', 'True', 'True', 'False', '0'], + f"unexpected: {parts!r} stderr={err!r}") + + @only_run_in_forkserver_testsuite("avoids redundant testing.") + def test_resource_tracker_raw_fork_prompt_reap(self): + # gh-146313: After a raw os.fork() the at-fork handler closes the + # child's inherited fd, so the parent can reap the tracker + # immediately -- even while the child is still alive -- rather + # than waiting out the 1s timeout. + cmd = textwrap.dedent(''' + import os, signal + from multiprocessing.resource_tracker import _resource_tracker + + _resource_tracker.ensure_running() + r, w = os.pipe() + pid = os.fork() + if pid == 0: + os.close(r) + # Report whether our fd was closed by the at-fork handler. + os.write(w, b"1" if _resource_tracker._fd is None else b"0") + os.close(w) + signal.pause() # stay alive so parent's reap is meaningful + os._exit(0) + os.close(w) + child_fd_closed = os.read(r, 1) == b"1" + os.close(r) + + # Child is still alive and paused. Because it closed its fd + # copy, our close below is the last one and the tracker exits. + _resource_tracker._stop(wait_timeout=5.0) + + os.kill(pid, signal.SIGKILL) + os.waitpid(pid, 0) + print(child_fd_closed, + _resource_tracker._waitpid_timed_out, + _resource_tracker._exitcode) + ''') + rc, out, err = script_helper.assert_python_ok('-c', cmd) + parts = out.decode().split() + self.assertEqual(parts, ['True', 'False', '0'], + f"unexpected: {parts!r} stderr={err!r}") + + @only_run_in_forkserver_testsuite("avoids redundant testing.") + def test_resource_tracker_lock_reinit_after_fork(self): + # gh-146313: If a parent thread held the tracker's lock at fork + # time, the child would inherit the held lock and deadlock on + # its next ensure_running(). The at-fork handler reinits it. + cmd = textwrap.dedent(''' + import os, threading + from multiprocessing.resource_tracker import _resource_tracker + + held = threading.Event() + release = threading.Event() + def hold(): + with _resource_tracker._lock: + held.set() + release.wait() + t = threading.Thread(target=hold) + t.start() + held.wait() + + pid = os.fork() + if pid == 0: + ok = _resource_tracker._lock.acquire(timeout=5.0) + os._exit(0 if ok else 1) + + release.set() + t.join() + _, status = os.waitpid(pid, 0) + print(os.waitstatus_to_exitcode(status)) + ''') + rc, out, err = script_helper.assert_python_ok( + '-W', 'ignore::DeprecationWarning', '-c', cmd) + self.assertEqual(out.strip(), b'0', + f"child failed to acquire lock: stderr={err!r}") + + @only_run_in_forkserver_testsuite("avoids redundant testing.") + def test_resource_tracker_safety_net_timeout(self): + # gh-146313: When an mp.Process(fork) child holds the preserved + # fd and the parent calls _stop() without joining (simulating + # abnormal shutdown), the safety-net timeout should fire rather + # than deadlocking. + cmd = textwrap.dedent(''' + import multiprocessing as mp + import signal + from multiprocessing.resource_tracker import _resource_tracker + + if __name__ == "__main__": + mp.set_start_method("fork") + _resource_tracker.ensure_running() + p = mp.Process(target=signal.pause) + p.start() + # Stop WITHOUT joining -- child still holds preserved fd + _resource_tracker._stop(wait_timeout=0.5) + print(_resource_tracker._waitpid_timed_out) + p.terminate() + p.join() + ''') + rc, out, err = script_helper.assert_python_ok('-c', cmd) + self.assertEqual(out.strip(), b'True', + f"safety-net timeout did not fire: stderr={err!r}") + + class TestSimpleQueue(unittest.TestCase): @classmethod diff --git a/Misc/NEWS.d/next/Library/2026-03-22-23-42-22.gh-issue-146313.RtDeAd.rst b/Misc/NEWS.d/next/Library/2026-03-22-23-42-22.gh-issue-146313.RtDeAd.rst new file mode 100644 index 000000000000..1beea3694c42 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-03-22-23-42-22.gh-issue-146313.RtDeAd.rst @@ -0,0 +1,4 @@ +Fix a deadlock in :mod:`multiprocessing`'s resource tracker +where the parent process could hang indefinitely in :func:`os.waitpid` +during interpreter shutdown if a child created via :func:`os.fork` still +held the resource tracker's pipe open.