target of the "as" clause, if there is one.
+.. function:: wait_process(pid, *, exitcode, timeout=None)
+
+ Wait until process *pid* completes and check that the process exit code is
+ *exitcode*.
+
+ Raise an :exc:`AssertionError` if the process exit code is not equal to
+ *exitcode*.
+
+ If the process runs longer than *timeout* seconds (:data:`SHORT_TIMEOUT` by
+ default), kill the process and raise an :exc:`AssertionError`. The timeout
+ feature is not available on Windows.
+
+ .. versionadded:: 3.9
+
+
.. function:: wait_threads_exit(timeout=60.0)
Context manager to wait until all threads created in the ``with`` statement
pid = _resource_tracker._pid
if pid is not None:
os.kill(pid, signal.SIGKILL)
- os.waitpid(pid, 0)
+ support.wait_process(pid, exitcode=-signal.SIGKILL)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
_resource_tracker.ensure_running()
pass
def wait_impl(self, cpid):
- for i in range(10):
- # waitpid() shouldn't hang, but some of the buildbots seem to hang
- # in the forking tests. This is an attempt to fix the problem.
- spid, status = os.waitpid(cpid, os.WNOHANG)
- if spid == cpid:
- break
- time.sleep(2 * SHORTSLEEP)
-
- self.assertEqual(spid, cpid)
- self.assertEqual(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8))
+ support.wait_process(cpid, exitcode=0)
def test_wait(self):
for i in range(NUM_THREADS):
del self.exc_value
del self.exc_traceback
del self.thread
+
+
+def wait_process(pid, *, exitcode, timeout=None):
+ """
+ Wait until process pid completes and check that the process exit code is
+ exitcode.
+
+ Raise an AssertionError if the process exit code is not equal to exitcode.
+
+ If the process runs longer than timeout seconds (SHORT_TIMEOUT by default),
+ kill the process (if signal.SIGKILL is available) and raise an
+ AssertionError. The timeout feature is not available on Windows.
+ """
+ if os.name != "nt":
+ if timeout is None:
+ timeout = SHORT_TIMEOUT
+ t0 = time.monotonic()
+ deadline = t0 + timeout
+ sleep = 0.001
+ max_sleep = 0.1
+ while True:
+ pid2, status = os.waitpid(pid, os.WNOHANG)
+ if pid2 != 0:
+ break
+ # process is still running
+
+ dt = time.monotonic() - t0
+ if dt > SHORT_TIMEOUT:
+ try:
+ os.kill(pid, signal.SIGKILL)
+ os.waitpid(pid, 0)
+ except OSError:
+ # Ignore errors like ChildProcessError or PermissionError
+ pass
+
+ raise AssertionError(f"process {pid} is still running "
+ f"after {dt:.1f} seconds")
+
+ sleep = min(sleep * 2, max_sleep)
+ time.sleep(sleep)
+
+ if os.WIFEXITED(status):
+ exitcode2 = os.WEXITSTATUS(status)
+ elif os.WIFSIGNALED(status):
+ exitcode2 = -os.WTERMSIG(status)
+ else:
+ raise ValueError(f"invalid wait status: {status!r}")
+ else:
+ # Windows implementation
+ pid2, status = os.waitpid(pid, 0)
+ exitcode2 = (status >> 8)
+
+ if exitcode2 != exitcode:
+ raise AssertionError(f"process {pid} exited with code {exitcode2}, "
+ f"but exit code {exitcode} is expected")
+
+ # sanity check: it should not fail in practice
+ if pid2 != pid:
+ raise AssertionError(f"pid {pid2} != pid {pid}")
from textwrap import dedent
from types import AsyncGeneratorType, FunctionType
from operator import neg
+from test import support
from test.support import (
EnvironmentVarGuard, TESTFN, check_warnings, swap_attr, unlink,
maybe_get_event_loop_policy)
os.close(fd)
# Wait until the child process completes
- os.waitpid(pid, 0)
+ support.wait_process(pid, exitcode=0)
return lines
locks_held__ready_to_fork.wait()
pid = os.fork()
- if pid == 0: # Child.
+ if pid == 0:
+ # Child process
try:
test_logger.info(r'Child process did not deadlock. \o/')
finally:
os._exit(0)
- else: # Parent.
+ else:
+ # Parent process
test_logger.info(r'Parent process returned from fork. \o/')
fork_happened__release_locks_and_end_thread.set()
lock_holder_thread.join()
- start_time = time.monotonic()
- while True:
- test_logger.debug('Waiting for child process.')
- waited_pid, status = os.waitpid(pid, os.WNOHANG)
- if waited_pid == pid:
- break # child process exited.
- if time.monotonic() - start_time > 7:
- break # so long? implies child deadlock.
- time.sleep(0.05)
- test_logger.debug('Done waiting.')
- if waited_pid != pid:
- os.kill(pid, signal.SIGKILL)
- waited_pid, status = os.waitpid(pid, 0)
- self.fail("child process deadlocked.")
- self.assertEqual(status, 0, msg="child process error")
+
+ support.wait_process(pid, exitcode=0)
class BadStream(object):
# Signal the child it can now release the lock and exit.
p.send(b'p')
# Wait for child to exit. Locking should now succeed.
- exited_pid, status = os.waitpid(pid, 0)
+ support.wait_process(pid, exitcode=0)
self._box.lock()
self._box.unlock()
args = [sys.executable, '-c', 'pass']
# Add an implicit test for PyUnicode_FSConverter().
pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
- status = os.waitpid(pid, 0)
- self.assertEqual(status, (pid, 0))
+ support.wait_process(pid, exitcode=0)
class SpawnTests(unittest.TestCase):
def test_nowait(self):
args = self.create_args()
pid = os.spawnv(os.P_NOWAIT, args[0], args)
- result = os.waitpid(pid, 0)
- self.assertEqual(result[0], pid)
- status = result[1]
- if hasattr(os, 'WIFEXITED'):
- self.assertTrue(os.WIFEXITED(status))
- self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
- else:
- self.assertEqual(status, self.exitcode << 8)
+ support.wait_process(pid, exitcode=self.exitcode)
@requires_os_func('spawnve')
def test_spawnve_bytes(self):
else:
# parent
- cpid, sts = os.waitpid(pid, 0)
- self.assertEqual(cpid, pid)
- self.assertEqual(sts, 0)
+ support.wait_process(pid, exitcode=0)
def test_libc_ver(self):
# check that libc_ver(executable) doesn't raise an exception
requires_sched = unittest.skipUnless(_supports_sched(), 'requires POSIX scheduler API')
+
class PosixTester(unittest.TestCase):
def setUp(self):
@unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter")
@unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
- @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
def test_fexecve(self):
fp = os.open(sys.executable, os.O_RDONLY)
try:
os.chdir(os.path.split(sys.executable)[0])
posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ)
else:
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ support.wait_process(pid, exitcode=0)
finally:
os.close(fp)
"""
args = self.python_args('-c', script)
pid = self.spawn_func(args[0], args, os.environ)
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ support.wait_process(pid, exitcode=0)
with open(pidfile) as f:
self.assertEqual(f.read(), str(pid))
args = self.python_args('-c', script)
pid = self.spawn_func(args[0], args,
{**os.environ, 'foo': 'bar'})
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ support.wait_process(pid, exitcode=0)
with open(envfile) as f:
self.assertEqual(f.read(), 'bar')
os.environ,
file_actions=None
)
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ support.wait_process(pid, exitcode=0)
def test_empty_file_actions(self):
pid = self.spawn_func(
os.environ,
file_actions=[]
)
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ support.wait_process(pid, exitcode=0)
def test_resetids_explicit_default(self):
pid = self.spawn_func(
os.environ,
resetids=False
)
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ support.wait_process(pid, exitcode=0)
def test_resetids(self):
pid = self.spawn_func(
os.environ,
resetids=True
)
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ support.wait_process(pid, exitcode=0)
def test_resetids_wrong_type(self):
with self.assertRaises(TypeError):
os.environ,
setpgroup=os.getpgrp()
)
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ support.wait_process(pid, exitcode=0)
def test_setpgroup_wrong_type(self):
with self.assertRaises(TypeError):
os.environ,
setsigmask=[signal.SIGUSR1]
)
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ support.wait_process(pid, exitcode=0)
def test_setsigmask_wrong_type(self):
with self.assertRaises(TypeError):
finally:
os.close(wfd)
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ support.wait_process(pid, exitcode=0)
+
output = os.read(rfd, 100)
child_sid = int(output)
parent_sid = os.getsid(os.getpid())
finally:
signal.signal(signal.SIGUSR1, original_handler)
- pid2, status = os.waitpid(pid, 0)
- self.assertEqual(pid2, pid)
- self.assertTrue(os.WIFSIGNALED(status), status)
- self.assertEqual(os.WTERMSIG(status), signal.SIGUSR1)
+ support.wait_process(pid, exitcode=-signal.SIGUSR1)
def test_setsigdef_wrong_type(self):
with self.assertRaises(TypeError):
os.environ,
scheduler=(None, os.sched_param(priority))
)
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ support.wait_process(pid, exitcode=0)
@requires_sched
@unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')),
os.environ,
scheduler=(policy, os.sched_param(priority))
)
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ support.wait_process(pid, exitcode=0)
def test_multiple_file_actions(self):
file_actions = [
self.NOOP_PROGRAM,
os.environ,
file_actions=file_actions)
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ support.wait_process(pid, exitcode=0)
def test_bad_file_actions(self):
args = self.NOOP_PROGRAM
args = self.python_args('-c', script)
pid = self.spawn_func(args[0], args, os.environ,
file_actions=file_actions)
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+
+ support.wait_process(pid, exitcode=0)
with open(outfile) as f:
self.assertEqual(f.read(), 'hello')
args = self.python_args('-c', script)
pid = self.spawn_func(args[0], args, os.environ,
file_actions=[(os.POSIX_SPAWN_CLOSE, 0)])
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+
+ support.wait_process(pid, exitcode=0)
with open(closefile) as f:
self.assertEqual(f.read(), 'is closed %d' % errno.EBADF)
args = self.python_args('-c', script)
pid = self.spawn_func(args[0], args, os.environ,
file_actions=file_actions)
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ support.wait_process(pid, exitcode=0)
with open(dupfile) as f:
self.assertEqual(f.read(), 'hello')
spawn_args = (program, '-I', '-S', '-c', 'pass')
code = textwrap.dedent("""
import os
+ from test import support
+
args = %a
pid = os.posix_spawnp(args[0], args, os.environ)
- pid2, status = os.waitpid(pid, 0)
- if pid2 != pid:
- raise Exception(f"pid {pid2} != {pid}")
- if status != 0:
- raise Exception(f"status {status} != 0")
+
+ support.wait_process(pid, exitcode=0)
""" % (spawn_args,))
# Use a subprocess to test os.posix_spawnp() with a modified PATH
child_val = eval(f.read())
self.assertNotEqual(val, child_val)
- pid, status = os.waitpid(pid, 0)
- self.assertEqual(status, 0)
+ support.wait_process(pid, exitcode=0)
if __name__ == "__main__":
except:
raise
finally:
- pid2, status = os.waitpid(pid, 0)
- testcase.assertEqual(pid2, pid)
- testcase.assertEqual(72 << 8, status)
+ test.support.wait_process(pid, exitcode=72)
class SocketServerTest(unittest.TestCase):
else:
os.close(wfd)
self.addCleanup(os.close, rfd)
- _, status = os.waitpid(pid, 0)
- self.assertEqual(status, 0)
+ support.wait_process(pid, exitcode=0)
child_random = os.read(rfd, 16)
self.assertEqual(len(child_random), 16)
proc = subprocess.Popen(args)
# Wait until the real process completes to avoid zombie process
- pid = proc.pid
- pid, status = os.waitpid(pid, 0)
- self.assertEqual(status, 0)
+ support.wait_process(proc.pid, exitcode=0)
status = _testcapi.W_STOPCODE(3)
- with mock.patch('subprocess.os.waitpid', return_value=(pid, status)):
+ with mock.patch('subprocess.os.waitpid', return_value=(proc.pid, status)):
returncode = proc.wait()
self.assertEqual(returncode, -3)
proc = subprocess.Popen(ZERO_RETURN_CMD)
# wait until the process completes without using the Popen APIs.
- pid, status = os.waitpid(proc.pid, 0)
- self.assertEqual(pid, proc.pid)
- self.assertTrue(os.WIFEXITED(status), status)
- self.assertEqual(os.WEXITSTATUS(status), 0)
+ support.wait_process(proc.pid, exitcode=0)
# returncode is still None but the process completed.
self.assertIsNone(proc.returncode)
with support.temp_cwd() as temp_path:
pid = os.fork()
if pid != 0:
- # parent process (child has pid == 0)
+ # parent process
# wait for the child to terminate
- (pid, status) = os.waitpid(pid, 0)
- if status != 0:
- raise AssertionError(f"Child process failed with exit "
- f"status indication 0x{status:x}.")
+ support.wait_process(pid, exitcode=0)
# Make sure that temp_path is still present. When the child
# process leaves the 'temp_cwd'-context, the __exit__()-
child_value = os.read(read_fd, len(parent_value)).decode("ascii")
finally:
if pid:
- # best effort to ensure the process can't bleed out
- # via any bugs above
- try:
- os.kill(pid, signal.SIGKILL)
- except OSError:
- pass
-
- # Read the process exit status to avoid zombie process
- os.waitpid(pid, 0)
+ support.wait_process(pid, exitcode=0)
os.close(read_fd)
os.close(write_fd)
finally:
os._exit(exitcode)
else:
- pid2, status = os.waitpid(pid, 0)
- self.assertTrue(os.WIFEXITED(status))
- exitcode = os.WEXITSTATUS(status)
- self.assertEqual(exitcode, 0)
+ support.wait_process(pid, exitcode=0)
class TestSnapshot(unittest.TestCase):
os.close(fds[1])
self.addCleanup(os.close, fds[0])
parent_value = self.uuid.uuid4().hex
- os.waitpid(pid, 0)
+ support.wait_process(pid, exitcode=0)
child_value = os.read(fds[0], 100).decode('latin-1')
self.assertNotEqual(parent_value, child_value)
--- /dev/null
+Add :func:`test.support.wait_process` function.