import sys
import threading
import warnings
+from collections import deque
from . import spawn
from . import util
self._fd = None
self._pid = None
self._exitcode = None
+ self._reentrant_messages = deque()
def _reentrant_call_error(self):
# gh-109629: this happens if an explicit call to the ResourceTracker
# This shouldn't happen (it might when called by a finalizer)
# so we check for it anyway.
if self._lock._recursion_count() > 1:
- return self._reentrant_call_error()
+ raise self._reentrant_call_error()
if self._fd is None:
# not running
return
This can be run from any process. Usually a child process will use
the resource created by its parent.'''
+ return self._ensure_running_and_write()
+
+ def _teardown_dead_process(self):
+ os.close(self._fd)
+
+ # Clean-up to avoid dangling processes.
+ try:
+ # _pid can be None if this process is a child from another
+ # python process, which has started the resource_tracker.
+ if self._pid is not None:
+ os.waitpid(self._pid, 0)
+ except ChildProcessError:
+ # The resource_tracker has already been terminated.
+ pass
+ self._fd = None
+ self._pid = None
+ self._exitcode = None
+
+ warnings.warn('resource_tracker: process died unexpectedly, '
+ 'relaunching. Some resources might leak.')
+
+ def _launch(self):
+ fds_to_pass = []
+ try:
+ fds_to_pass.append(sys.stderr.fileno())
+ except Exception:
+ pass
+ r, w = os.pipe()
+ try:
+ fds_to_pass.append(r)
+ # process will out live us, so no need to wait on pid
+ exe = spawn.get_executable()
+ args = [
+ exe,
+ *util._args_from_interpreter_flags(),
+ '-c',
+ f'from multiprocessing.resource_tracker import main;main({r})',
+ ]
+ # bpo-33613: Register a signal mask that will block the signals.
+ # This signal mask will be inherited by the child that is going
+ # to be spawned and will protect the child from a race condition
+ # that can make the child die before it registers signal handlers
+ # for SIGINT and SIGTERM. The mask is unregistered after spawning
+ # the child.
+ prev_sigmask = None
+ try:
+ if _HAVE_SIGMASK:
+ prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
+ pid = util.spawnv_passfds(exe, args, fds_to_pass)
+ finally:
+ if prev_sigmask is not None:
+ signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
+ except:
+ os.close(w)
+ raise
+ else:
+ self._fd = w
+ self._pid = pid
+ finally:
+ os.close(r)
+
+ def _ensure_running_and_write(self, msg=None):
with self._lock:
if self._lock._recursion_count() > 1:
# The code below is certainly not reentrant-safe, so bail out
- return self._reentrant_call_error()
+ if msg is None:
+ raise self._reentrant_call_error()
+ return self._reentrant_messages.append(msg)
+
if self._fd is not None:
# resource tracker was launched before, is it still running?
- if self._check_alive():
- # => still alive
- return
- # => dead, launch it again
- os.close(self._fd)
-
- # Clean-up to avoid dangling processes.
+ if msg is None:
+ to_send = b'PROBE:0:noop\n'
+ else:
+ to_send = msg
try:
- # _pid can be None if this process is a child from another
- # python process, which has started the resource_tracker.
- if self._pid is not None:
- os.waitpid(self._pid, 0)
- except ChildProcessError:
- # The resource_tracker has already been terminated.
- pass
- self._fd = None
- self._pid = None
- self._exitcode = None
+ self._write(to_send)
+ except OSError:
+ self._teardown_dead_process()
+ self._launch()
- warnings.warn('resource_tracker: process died unexpectedly, '
- 'relaunching. Some resources might leak.')
+ msg = None # message was sent in probe
+ else:
+ self._launch()
- fds_to_pass = []
+ while True:
try:
- fds_to_pass.append(sys.stderr.fileno())
- except Exception:
- pass
- cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
- r, w = os.pipe()
- try:
- fds_to_pass.append(r)
- # process will out live us, so no need to wait on pid
- exe = spawn.get_executable()
- args = [exe] + util._args_from_interpreter_flags()
- args += ['-c', cmd % r]
- # bpo-33613: Register a signal mask that will block the signals.
- # This signal mask will be inherited by the child that is going
- # to be spawned and will protect the child from a race condition
- # that can make the child die before it registers signal handlers
- # for SIGINT and SIGTERM. The mask is unregistered after spawning
- # the child.
- prev_sigmask = None
- try:
- if _HAVE_SIGMASK:
- prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
- pid = util.spawnv_passfds(exe, args, fds_to_pass)
- finally:
- if prev_sigmask is not None:
- signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
- except:
- os.close(w)
- raise
- else:
- self._fd = w
- self._pid = pid
- finally:
- os.close(r)
+ reentrant_msg = self._reentrant_messages.popleft()
+ except IndexError:
+ break
+ self._write(reentrant_msg)
+ if msg is not None:
+ self._write(msg)
def _check_alive(self):
'''Check that the pipe has not been closed by sending a probe.'''
'''Unregister name of resource with resource tracker.'''
self._send('UNREGISTER', name, rtype)
+ def _write(self, msg):
+ nbytes = os.write(self._fd, msg)
+ assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
+
def _send(self, cmd, name, rtype):
- try:
- self.ensure_running()
- except ReentrantCallError:
- # The code below might or might not work, depending on whether
- # the resource tracker was already running and still alive.
- # Better warn the user.
- # (XXX is warnings.warn itself reentrant-safe? :-)
- warnings.warn(
- f"ResourceTracker called reentrantly for resource cleanup, "
- f"which is unsupported. "
- f"The {rtype} object {name!r} might leak.")
- msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
+ msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
if len(msg) > 512:
# posix guarantees that writes to a pipe of less than PIPE_BUF
# bytes are atomic, and that PIPE_BUF >= 512
raise ValueError('msg too long')
- nbytes = os.write(self._fd, msg)
- assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
- nbytes, len(msg))
+ self._ensure_running_and_write(msg)
_resource_tracker = ResourceTracker()
ensure_running = _resource_tracker.ensure_running