__all__ = ['ensure_running', 'register', 'unregister']
+_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
+_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
+
class SemaphoreTracker(object):
with self._lock:
if self._pid is not None:
# semaphore tracker was launched before, is it still running?
- pid, status = os.waitpid(self._pid, os.WNOHANG)
- if not pid:
- # => still alive
- return
+ try:
+ pid, _ = os.waitpid(self._pid, os.WNOHANG)
+ except ChildProcessError:
+ # The process terminated
+ pass
+ else:
+ if not pid:
+ # => still alive
+ return
+
# => dead, launch it again
os.close(self._fd)
self._fd = None
exe = spawn.get_executable()
args = [exe] + util._args_from_interpreter_flags()
args += ['-c', cmd % r]
- pid = util.spawnv_passfds(exe, args, fds_to_pass)
+ # bpo-33613: Register a signal mask that will block the signals.
+ # This signal mask will be inherited by the child that is going
+ # to be spawned and will protect the child from a race condition
+ # that can make the child die before it registers signal handlers
+ # for SIGINT and SIGTERM. The mask is unregistered after spawning
+ # the child.
+ try:
+ if _HAVE_SIGMASK:
+ signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
+ pid = util.spawnv_passfds(exe, args, fds_to_pass)
+ finally:
+ if _HAVE_SIGMASK:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
except:
os.close(w)
raise
unregister = _semaphore_tracker.unregister
getfd = _semaphore_tracker.getfd
-
def main(fd):
'''Run semaphore tracker.'''
# protect the process from ^C and "killall python" etc
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ if _HAVE_SIGMASK:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
for f in (sys.stdin, sys.stdout):
try:
import struct
import operator
import weakref
+import warnings
import test.support
import test.support.script_helper
from test import support
# bpo-31310: if the semaphore tracker process has died, it should
# be restarted implicitly.
from multiprocessing.semaphore_tracker import _semaphore_tracker
- _semaphore_tracker.ensure_running()
pid = _semaphore_tracker._pid
+ if pid is not None:
+ os.kill(pid, signal.SIGKILL)
+ os.waitpid(pid, 0)
+ with warnings.catch_warnings(record=True) as all_warn:
+ _semaphore_tracker.ensure_running()
+ pid = _semaphore_tracker._pid
+
os.kill(pid, signum)
time.sleep(1.0) # give it time to die
ctx = multiprocessing.get_context("spawn")
- with contextlib.ExitStack() as stack:
- if should_die:
- stack.enter_context(self.assertWarnsRegex(
- UserWarning,
- "semaphore_tracker: process died"))
+ with warnings.catch_warnings(record=True) as all_warn:
sem = ctx.Semaphore()
sem.acquire()
sem.release()
del sem
gc.collect()
self.assertIsNone(wr())
+ if should_die:
+ self.assertEqual(len(all_warn), 1)
+ the_warn = all_warn[0]
+ issubclass(the_warn.category, UserWarning)
+ self.assertTrue("semaphore_tracker: process died"
+ in str(the_warn.message))
+ else:
+ self.assertEqual(len(all_warn), 0)
def test_semaphore_tracker_sigint(self):
# Catchable signal (ignored by semaphore tracker)
self.check_semaphore_tracker_death(signal.SIGINT, False)
+ def test_semaphore_tracker_sigterm(self):
+ # Catchable signal (ignored by semaphore tracker)
+ self.check_semaphore_tracker_death(signal.SIGTERM, False)
+
def test_semaphore_tracker_sigkill(self):
# Uncatchable signal.
self.check_semaphore_tracker_death(signal.SIGKILL, True)