self._threads.pop(expected_pid)
+def can_use_pidfd():
+ if not hasattr(os, 'pidfd_open'):
+ return False
+ try:
+ pid = os.getpid()
+ os.close(os.pidfd_open(pid, 0))
+ except OSError:
+ # blocked by security policy like SECCOMP
+ return False
+ return True
+
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
"""UNIX event loop policy with a watcher for child processes."""
def _init_watcher(self):
with events._lock:
if self._watcher is None: # pragma: no branch
- self._watcher = ThreadedChildWatcher()
+ if can_use_pidfd():
+ self._watcher = PidfdChildWatcher()
+ else:
+ self._watcher = ThreadedChildWatcher()
if threading.current_thread() is threading.main_thread():
self._watcher.attach_loop(self._local._loop)
def test_get_default_child_watcher(self):
policy = self.create_policy()
self.assertIsNone(policy._watcher)
-
+ unix_events.can_use_pidfd = mock.Mock()
+ unix_events.can_use_pidfd.return_value = False
watcher = policy.get_child_watcher()
self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher)
self.assertIs(watcher, policy.get_child_watcher())
+ policy = self.create_policy()
+ self.assertIsNone(policy._watcher)
+ unix_events.can_use_pidfd = mock.Mock()
+ unix_events.can_use_pidfd.return_value = True
+ watcher = policy.get_child_watcher()
+ self.assertIsInstance(watcher, asyncio.PidfdChildWatcher)
+
+ self.assertIs(policy._watcher, watcher)
+
+ self.assertIs(watcher, policy.get_child_watcher())
+
def test_get_child_watcher_after_set(self):
policy = self.create_policy()
watcher = asyncio.FastChildWatcher()