is_android, is_apple_mobile, is_wasm32, reap_children, verbose, warnings_helper
)
from test.support.import_helper import import_module
-from test.support.os_helper import TESTFN, unlink
# Skip these tests if termios is not available
import_module('termios')
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
def test_spawn_doesnt_hang(self):
- self.addCleanup(unlink, TESTFN)
- with open(TESTFN, 'wb') as f:
- STDOUT_FILENO = 1
- dup_stdout = os.dup(STDOUT_FILENO)
- os.dup2(f.fileno(), STDOUT_FILENO)
- buf = b''
- def master_read(fd):
- nonlocal buf
- data = os.read(fd, 1024)
- buf += data
- return data
+ # gh-140482: Do the test in a pty.fork() child to avoid messing
+ # with the interactive test runner's terminal settings.
+ pid, fd = pty.fork()
+ if pid == pty.CHILD:
+ pty.spawn([sys.executable, '-c', 'print("hi there")'])
+ os._exit(0)
+
+ try:
+ buf = bytearray()
try:
- pty.spawn([sys.executable, '-c', 'print("hi there")'],
- master_read)
- finally:
- os.dup2(dup_stdout, STDOUT_FILENO)
- os.close(dup_stdout)
- self.assertEqual(buf, b'hi there\r\n')
- with open(TESTFN, 'rb') as f:
- self.assertEqual(f.read(), b'hi there\r\n')
+ while (data := os.read(fd, 1024)) != b'':
+ buf.extend(data)
+ except OSError as e:
+ if e.errno != errno.EIO:
+ raise
+
+ (pid, status) = os.waitpid(pid, 0)
+ self.assertEqual(status, 0)
+ self.assertEqual(buf.take_bytes(), b"hi there\r\n")
+ finally:
+ os.close(fd)
class SmallPtyTests(unittest.TestCase):
"""These tests don't spawn children or hang."""