.. note::
- When the ``timeout`` parameter is not ``None``, then (on POSIX) the
- function is implemented using a busy loop (non-blocking call and short
- sleeps). Use the :mod:`asyncio` module for an asynchronous wait: see
+ When ``timeout`` is not ``None`` and the platform supports it, an
+ efficient event-driven mechanism is used to wait for process termination:
+
+ - Linux >= 5.3 uses :func:`os.pidfd_open` + :func:`select.poll`
+ - macOS and other BSD variants use :func:`select.kqueue` +
+ ``KQ_FILTER_PROC`` + ``KQ_NOTE_EXIT``
+ - Windows uses ``WaitForSingleObject``
+
+ If none of these mechanisms are available, the function falls back to a
+ busy loop (non-blocking call and short sleeps).
+
+ .. note::
+
+ Use the :mod:`asyncio` module for an asynchronous wait: see
:class:`asyncio.create_subprocess_exec`.
.. versionchanged:: 3.3
*timeout* was added.
+ .. versionchanged:: 3.15
+ if *timeout* is not ``None``, use efficient event-driven implementation
+ on Linux >= 5.3 and macOS / BSD.
+
.. method:: Popen.communicate(input=None, timeout=None)
Interact with process: Send data to stdin. Read data from stdout and stderr,
(Contributed by Ron Frederick in :gh:`138252`.)
+subprocess
+----------
+
+* :meth:`subprocess.Popen.wait`: when ``timeout`` is not ``None`` and the
+ platform supports it, an efficient event-driven mechanism is used to wait for
+ process termination:
+
+ - Linux >= 5.3 uses :func:`os.pidfd_open` + :func:`select.poll`.
+ - macOS and other BSD variants use :func:`select.kqueue` + ``KQ_FILTER_PROC`` + ``KQ_NOTE_EXIT``.
+ - Windows keeps using ``WaitForSingleObject`` (unchanged).
+
+ If none of these mechanisms are available, the function falls back to the
+ traditional busy loop (non-blocking call and short sleeps).
+ (Contributed by Giampaolo Rodola in :gh:`83069`).
symtable
--------
return False
+def _can_use_pidfd_open():
+ # Availability: Linux >= 5.3
+ if not hasattr(os, "pidfd_open"):
+ return False
+ try:
+ pidfd = os.pidfd_open(os.getpid(), 0)
+ except OSError as err:
+ if err.errno in {errno.EMFILE, errno.ENFILE}:
+ # transitory 'too many open files'
+ return True
+ # likely blocked by security policy like SECCOMP (EPERM,
+ # EACCES, ENOSYS)
+ return False
+ else:
+ os.close(pidfd)
+ return True
+
+
+def _can_use_kqueue():
+ # Availability: macOS, BSD
+ names = (
+ "kqueue",
+ "KQ_EV_ADD",
+ "KQ_EV_ONESHOT",
+ "KQ_FILTER_PROC",
+ "KQ_NOTE_EXIT",
+ )
+ if not all(hasattr(select, x) for x in names):
+ return False
+ kq = None
+ try:
+ kq = select.kqueue()
+ kev = select.kevent(
+ os.getpid(),
+ filter=select.KQ_FILTER_PROC,
+ flags=select.KQ_EV_ADD | select.KQ_EV_ONESHOT,
+ fflags=select.KQ_NOTE_EXIT,
+ )
+ kq.control([kev], 1, 0)
+ return True
+ except OSError as err:
+ if err.errno in {errno.EMFILE, errno.ENFILE}:
+ # transitory 'too many open files'
+ return True
+ return False
+ finally:
+ if kq is not None:
+ kq.close()
+
+
+_CAN_USE_PIDFD_OPEN = not _mswindows and _can_use_pidfd_open()
+_CAN_USE_KQUEUE = not _mswindows and _can_use_kqueue()
+
+
# These are primarily fail-safe knobs for negatives. A True value does not
# guarantee the given libc/syscall API will be used.
_USE_POSIX_SPAWN = _use_posix_spawn()
sts = 0
return (pid, sts)
+ def _wait_pidfd(self, timeout):
+ """Wait for PID to terminate using pidfd_open() + poll().
+ Linux >= 5.3 only.
+ """
+ if not _CAN_USE_PIDFD_OPEN:
+ return False
+ try:
+ pidfd = os.pidfd_open(self.pid, 0)
+ except OSError:
+ # May be:
+ # - ESRCH: no such process
+ # - EMFILE, ENFILE: too many open files (usually 1024)
+ # - ENODEV: anonymous inode filesystem not supported
+ # - EPERM, EACCES, ENOSYS: undocumented; may happen if
+ # blocked by security policy like SECCOMP
+ return False
+
+ try:
+ poller = select.poll()
+ poller.register(pidfd, select.POLLIN)
+ events = poller.poll(timeout * 1000)
+ if not events:
+ raise TimeoutExpired(self.args, timeout)
+ return True
+ finally:
+ os.close(pidfd)
+
+ def _wait_kqueue(self, timeout):
+ """Wait for PID to terminate using kqueue(). macOS and BSD only."""
+ if not _CAN_USE_KQUEUE:
+ return False
+ try:
+ kq = select.kqueue()
+ except OSError:
+ # likely EMFILE / ENFILE (too many open files)
+ return False
+
+ try:
+ kev = select.kevent(
+ self.pid,
+ filter=select.KQ_FILTER_PROC,
+ flags=select.KQ_EV_ADD | select.KQ_EV_ONESHOT,
+ fflags=select.KQ_NOTE_EXIT,
+ )
+ try:
+ events = kq.control([kev], 1, timeout) # wait
+ except OSError:
+ return False
+ else:
+ if not events:
+ raise TimeoutExpired(self.args, timeout)
+ return True
+ finally:
+ kq.close()
def _wait(self, timeout):
- """Internal implementation of wait() on POSIX."""
+ """Internal implementation of wait() on POSIX.
+
+ Uses efficient pidfd_open() + poll() on Linux or kqueue()
+ on macOS/BSD when available. Falls back to polling
+ waitpid(WNOHANG) otherwise.
+ """
if self.returncode is not None:
return self.returncode
if timeout is not None:
- endtime = _time() + timeout
+ if timeout < 0:
+ raise TimeoutExpired(self.args, timeout)
+ started = _time()
+ endtime = started + timeout
+
+ # Try efficient wait first.
+ if self._wait_pidfd(timeout) or self._wait_kqueue(timeout):
+ # Process is gone. At this point os.waitpid(pid, 0)
+ # will return immediately, but in very rare races
+ # the PID may have been reused.
+ # os.waitpid(pid, WNOHANG) ensures we attempt a
+ # non-blocking reap without blocking indefinitely.
+ with self._waitpid_lock:
+ if self.returncode is not None:
+ return self.returncode # Another thread waited.
+ (pid, sts) = self._try_wait(os.WNOHANG)
+ assert pid == self.pid or pid == 0
+ if pid == self.pid:
+ self._handle_exitstatus(sts)
+ return self.returncode
+ # os.waitpid(pid, WNOHANG) returned 0 instead
+ # of our PID, meaning PID has not yet exited,
+ # even though poll() / kqueue() said so. Very
+ # rare and mostly theoretical. Fallback to busy
+ # polling.
+ elapsed = _time() - started
+ endtime -= elapsed
+
# Enter a busy loop if we have a timeout. This busy loop was
# cribbed from Lib/threading.py in Thread.wait() at r71065.
delay = 0.0005 # 500 us -> initial delay of 1 ms
# http://bugs.python.org/issue14396.
if pid == self.pid:
self._handle_exitstatus(sts)
+
return self.returncode
def test_wait_timeout(self):
p = subprocess.Popen([sys.executable,
"-c", "import time; time.sleep(0.3)"])
+ with self.assertRaises(subprocess.TimeoutExpired) as c:
+ p.wait(timeout=0)
with self.assertRaises(subprocess.TimeoutExpired) as c:
p.wait(timeout=0.0001)
self.assertIn("0.0001", str(c.exception)) # For coverage of __str__.
self.assertTrue(proc.stdin.closed)
+
+class FastWaitTestCase(BaseTestCase):
+ """Tests for efficient (pidfd_open() + poll() / kqueue()) process
+ waiting in subprocess.Popen.wait().
+ """
+ CAN_USE_PIDFD_OPEN = subprocess._CAN_USE_PIDFD_OPEN
+ CAN_USE_KQUEUE = subprocess._CAN_USE_KQUEUE
+ COMMAND = [sys.executable, "-c", "import time; time.sleep(0.3)"]
+ WAIT_TIMEOUT = 0.0001 # 0.1 ms
+
+ def assert_fast_waitpid_error(self, patch_point):
+ # Emulate a case where pidfd_open() or kqueue() fails.
+ # Busy-poll wait should be used as fallback.
+ exc = OSError(errno.EMFILE, os.strerror(errno.EMFILE))
+ with mock.patch(patch_point, side_effect=exc) as m:
+ p = subprocess.Popen(self.COMMAND)
+ with self.assertRaises(subprocess.TimeoutExpired):
+ p.wait(self.WAIT_TIMEOUT)
+ self.assertEqual(p.wait(timeout=support.SHORT_TIMEOUT), 0)
+ self.assertTrue(m.called)
+
+ @unittest.skipIf(not CAN_USE_PIDFD_OPEN, reason="needs pidfd_open()")
+ def test_wait_pidfd_open_error(self):
+ self.assert_fast_waitpid_error("os.pidfd_open")
+
+ @unittest.skipIf(not CAN_USE_KQUEUE, reason="needs kqueue() for proc")
+ def test_wait_kqueue_error(self):
+ self.assert_fast_waitpid_error("select.kqueue")
+
+ @unittest.skipIf(not CAN_USE_KQUEUE, reason="needs kqueue() for proc")
+ def test_kqueue_control_error(self):
+ # Emulate a case where kqueue.control() fails. Busy-poll wait
+ # should be used as fallback.
+ p = subprocess.Popen(self.COMMAND)
+ kq_mock = mock.Mock()
+ kq_mock.control.side_effect = OSError(
+ errno.EPERM, os.strerror(errno.EPERM)
+ )
+ kq_mock.close = mock.Mock()
+
+ with mock.patch("select.kqueue", return_value=kq_mock) as m:
+ with self.assertRaises(subprocess.TimeoutExpired):
+ p.wait(self.WAIT_TIMEOUT)
+ self.assertEqual(p.wait(timeout=support.SHORT_TIMEOUT), 0)
+ self.assertTrue(m.called)
+
+ def assert_wait_race_condition(self, patch_target, real_func):
+ # Call pidfd_open() / kqueue(), then terminate the process.
+ # Make sure that the wait call (poll() / kqueue.control())
+ # still works for a terminated PID.
+ p = subprocess.Popen(self.COMMAND)
+
+ def wrapper(*args, **kwargs):
+ ret = real_func(*args, **kwargs)
+ try:
+ os.kill(p.pid, signal.SIGTERM)
+ os.waitpid(p.pid, 0)
+ except OSError:
+ pass
+ return ret
+
+ with mock.patch(patch_target, side_effect=wrapper) as m:
+ status = p.wait(timeout=support.SHORT_TIMEOUT)
+ self.assertTrue(m.called)
+ self.assertEqual(status, 0)
+
+ @unittest.skipIf(not CAN_USE_PIDFD_OPEN, reason="needs pidfd_open()")
+ def test_pidfd_open_race(self):
+ self.assert_wait_race_condition("os.pidfd_open", os.pidfd_open)
+
+ @unittest.skipIf(not CAN_USE_KQUEUE, reason="needs kqueue() for proc")
+ def test_kqueue_race(self):
+ self.assert_wait_race_condition("select.kqueue", select.kqueue)
+
+ def assert_notification_without_immediate_reap(self, patch_target):
+ # Verify fallback to busy polling when poll() / kqueue()
+ # succeeds, but waitpid(pid, WNOHANG) returns (0, 0).
+ def waitpid_wrapper(pid, flags):
+ nonlocal ncalls
+ ncalls += 1
+ if ncalls == 1:
+ return (0, 0)
+ return real_waitpid(pid, flags)
+
+ ncalls = 0
+ real_waitpid = os.waitpid
+ with mock.patch.object(subprocess.Popen, patch_target, return_value=True) as m1:
+ with mock.patch("os.waitpid", side_effect=waitpid_wrapper) as m2:
+ p = subprocess.Popen(self.COMMAND)
+ with self.assertRaises(subprocess.TimeoutExpired):
+ p.wait(self.WAIT_TIMEOUT)
+ self.assertEqual(p.wait(timeout=support.SHORT_TIMEOUT), 0)
+ self.assertTrue(m1.called)
+ self.assertTrue(m2.called)
+
+ @unittest.skipIf(not CAN_USE_PIDFD_OPEN, reason="needs pidfd_open()")
+ def test_pidfd_open_notification_without_immediate_reap(self):
+ self.assert_notification_without_immediate_reap("_wait_pidfd")
+
+ @unittest.skipIf(not CAN_USE_KQUEUE, reason="needs kqueue() for proc")
+ def test_kqueue_notification_without_immediate_reap(self):
+ self.assert_notification_without_immediate_reap("_wait_kqueue")
+
+ @unittest.skipUnless(
+ CAN_USE_PIDFD_OPEN or CAN_USE_KQUEUE,
+ "fast wait mechanism not available"
+ )
+ def test_fast_path_avoid_busy_loop(self):
+ # assert that the busy loop is not called as long as the fast
+ # wait is available
+ with mock.patch('time.sleep') as m:
+ p = subprocess.Popen(self.COMMAND)
+ with self.assertRaises(subprocess.TimeoutExpired):
+ p.wait(self.WAIT_TIMEOUT)
+ self.assertEqual(p.wait(timeout=support.LONG_TIMEOUT), 0)
+ self.assertFalse(m.called)
+
if __name__ == "__main__":
unittest.main()
--- /dev/null
+:meth:`subprocess.Popen.wait`: when ``timeout`` is not ``None``, an efficient
+event-driven mechanism now waits for process termination, if available. Linux
+>= 5.3 uses :func:`os.pidfd_open` + :func:`select.poll`. macOS and other BSD
+variants use :func:`select.kqueue` + ``KQ_FILTER_PROC`` + ``KQ_NOTE_EXIT``.
+Windows keeps using ``WaitForSingleObject`` (unchanged). If none of these
+mechanisms are available, the function falls back to the traditional busy loop
+(non-blocking call and short sleeps). Patch by Giampaolo Rodola.