# that can make the child die before it registers signal handlers
# for SIGINT and SIGTERM. The mask is unregistered after spawning
# the child.
- prev_sigmask = None
try:
if _HAVE_SIGMASK:
- prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
+ signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
pid = util.spawnv_passfds(exe, args, fds_to_pass)
finally:
- if prev_sigmask is not None:
- signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
+ if _HAVE_SIGMASK:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
except:
os.close(w)
raise
self._test_resource_tracker_leak_resources(
cleanup=cleanup,
)
- @unittest.skipUnless(hasattr(signal, "pthread_sigmask"), "pthread_sigmask is not available")
- def test_resource_tracker_blocked_signals(self):
- #
- # gh-127586: Check that resource_tracker does not override blocked signals of caller.
- #
- from multiprocessing.resource_tracker import ResourceTracker
- signals = {signal.SIGTERM, signal.SIGINT, signal.SIGUSR1}
-
- for sig in signals:
- signal.pthread_sigmask(signal.SIG_SETMASK, {sig})
- self.assertEqual(signal.pthread_sigmask(signal.SIG_BLOCK, set()), {sig})
- tracker = ResourceTracker()
- tracker.ensure_running()
- self.assertEqual(signal.pthread_sigmask(signal.SIG_BLOCK, set()), {sig})
- tracker._stop()
class TestSimpleQueue(unittest.TestCase):