]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-131788: make resource_tracker re-entrant safe (GH-131787)
authorThomas Grainger <tagrain@gmail.com>
Wed, 13 Aug 2025 20:00:23 +0000 (21:00 +0100)
committerGitHub <noreply@github.com>
Wed, 13 Aug 2025 20:00:23 +0000 (20:00 +0000)
* make resource_tracker re-entrant safe
* Update Lib/multiprocessing/resource_tracker.py
* trim trailing whitespace
* use f-string and args = [x, *y, z]
* raise self._reentrant_call_error

---------

Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
Co-authored-by: Gregory P. Smith <greg@krypto.org>
Lib/multiprocessing/resource_tracker.py
Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst [new file with mode: 0644]

index c4d0ca81e7034a92a0033856b29f17603dbea2bc..c53092f6e34b322922fee78a036487048bfb652f 100644 (file)
@@ -20,6 +20,7 @@ import signal
 import sys
 import threading
 import warnings
+from collections import deque
 
 from . import spawn
 from . import util
@@ -62,6 +63,7 @@ class ResourceTracker(object):
         self._fd = None
         self._pid = None
         self._exitcode = None
+        self._reentrant_messages = deque()
 
     def _reentrant_call_error(self):
         # gh-109629: this happens if an explicit call to the ResourceTracker
@@ -98,7 +100,7 @@ class ResourceTracker(object):
         # This shouldn't happen (it might when called by a finalizer)
         # so we check for it anyway.
         if self._lock._recursion_count() > 1:
-            return self._reentrant_call_error()
+            raise self._reentrant_call_error()
         if self._fd is None:
             # not running
             return
@@ -128,69 +130,99 @@ class ResourceTracker(object):
 
         This can be run from any process.  Usually a child process will use
         the resource created by its parent.'''
+        return self._ensure_running_and_write()
+
+    def _teardown_dead_process(self):
+        os.close(self._fd)
+
+        # Clean-up to avoid dangling processes.
+        try:
+            # _pid can be None if this process is a child from another
+            # python process, which has started the resource_tracker.
+            if self._pid is not None:
+                os.waitpid(self._pid, 0)
+        except ChildProcessError:
+            # The resource_tracker has already been terminated.
+            pass
+        self._fd = None
+        self._pid = None
+        self._exitcode = None
+
+        warnings.warn('resource_tracker: process died unexpectedly, '
+                      'relaunching.  Some resources might leak.')
+
+    def _launch(self):
+        fds_to_pass = []
+        try:
+            fds_to_pass.append(sys.stderr.fileno())
+        except Exception:
+            pass
+        r, w = os.pipe()
+        try:
+            fds_to_pass.append(r)
+            # process will out live us, so no need to wait on pid
+            exe = spawn.get_executable()
+            args = [
+                exe,
+                *util._args_from_interpreter_flags(),
+                '-c',
+                f'from multiprocessing.resource_tracker import main;main({r})',
+            ]
+            # bpo-33613: Register a signal mask that will block the signals.
+            # This signal mask will be inherited by the child that is going
+            # to be spawned and will protect the child from a race condition
+            # that can make the child die before it registers signal handlers
+            # for SIGINT and SIGTERM. The mask is unregistered after spawning
+            # the child.
+            prev_sigmask = None
+            try:
+                if _HAVE_SIGMASK:
+                    prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
+                pid = util.spawnv_passfds(exe, args, fds_to_pass)
+            finally:
+                if prev_sigmask is not None:
+                    signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
+        except:
+            os.close(w)
+            raise
+        else:
+            self._fd = w
+            self._pid = pid
+        finally:
+            os.close(r)
+
+    def _ensure_running_and_write(self, msg=None):
         with self._lock:
             if self._lock._recursion_count() > 1:
                 # The code below is certainly not reentrant-safe, so bail out
-                return self._reentrant_call_error()
+                if msg is None:
+                    raise self._reentrant_call_error()
+                return self._reentrant_messages.append(msg)
+
             if self._fd is not None:
                 # resource tracker was launched before, is it still running?
-                if self._check_alive():
-                    # => still alive
-                    return
-                # => dead, launch it again
-                os.close(self._fd)
-
-                # Clean-up to avoid dangling processes.
+                if msg is None:
+                    to_send = b'PROBE:0:noop\n'
+                else:
+                    to_send = msg
                 try:
-                    # _pid can be None if this process is a child from another
-                    # python process, which has started the resource_tracker.
-                    if self._pid is not None:
-                        os.waitpid(self._pid, 0)
-                except ChildProcessError:
-                    # The resource_tracker has already been terminated.
-                    pass
-                self._fd = None
-                self._pid = None
-                self._exitcode = None
+                    self._write(to_send)
+                except OSError:
+                    self._teardown_dead_process()
+                    self._launch()
 
-                warnings.warn('resource_tracker: process died unexpectedly, '
-                              'relaunching.  Some resources might leak.')
+                msg = None  # message was sent in probe
+            else:
+                self._launch()
 
-            fds_to_pass = []
+        while True:
             try:
-                fds_to_pass.append(sys.stderr.fileno())
-            except Exception:
-                pass
-            cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
-            r, w = os.pipe()
-            try:
-                fds_to_pass.append(r)
-                # process will out live us, so no need to wait on pid
-                exe = spawn.get_executable()
-                args = [exe] + util._args_from_interpreter_flags()
-                args += ['-c', cmd % r]
-                # bpo-33613: Register a signal mask that will block the signals.
-                # This signal mask will be inherited by the child that is going
-                # to be spawned and will protect the child from a race condition
-                # that can make the child die before it registers signal handlers
-                # for SIGINT and SIGTERM. The mask is unregistered after spawning
-                # the child.
-                prev_sigmask = None
-                try:
-                    if _HAVE_SIGMASK:
-                        prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
-                    pid = util.spawnv_passfds(exe, args, fds_to_pass)
-                finally:
-                    if prev_sigmask is not None:
-                        signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
-            except:
-                os.close(w)
-                raise
-            else:
-                self._fd = w
-                self._pid = pid
-            finally:
-                os.close(r)
+                reentrant_msg = self._reentrant_messages.popleft()
+            except IndexError:
+                break
+            self._write(reentrant_msg)
+        if msg is not None:
+            self._write(msg)
 
     def _check_alive(self):
         '''Check that the pipe has not been closed by sending a probe.'''
@@ -211,27 +243,18 @@ class ResourceTracker(object):
         '''Unregister name of resource with resource tracker.'''
         self._send('UNREGISTER', name, rtype)
 
+    def _write(self, msg):
+        nbytes = os.write(self._fd, msg)
+        assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
+
     def _send(self, cmd, name, rtype):
-        try:
-            self.ensure_running()
-        except ReentrantCallError:
-            # The code below might or might not work, depending on whether
-            # the resource tracker was already running and still alive.
-            # Better warn the user.
-            # (XXX is warnings.warn itself reentrant-safe? :-)
-            warnings.warn(
-                f"ResourceTracker called reentrantly for resource cleanup, "
-                f"which is unsupported. "
-                f"The {rtype} object {name!r} might leak.")
-        msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
+        msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
         if len(msg) > 512:
             # posix guarantees that writes to a pipe of less than PIPE_BUF
             # bytes are atomic, and that PIPE_BUF >= 512
             raise ValueError('msg too long')
-        nbytes = os.write(self._fd, msg)
-        assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
-            nbytes, len(msg))
 
+        self._ensure_running_and_write(msg)
 
 _resource_tracker = ResourceTracker()
 ensure_running = _resource_tracker.ensure_running
diff --git a/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst b/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst
new file mode 100644 (file)
index 0000000..5258024
--- /dev/null
@@ -0,0 +1 @@
+Make ``ResourceTracker.send`` from :mod:`multiprocessing` re-entrant safe