]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[3.11] gh-109709: Fix asyncio test_stdin_broken_pipe() (#109710) (#109735)
authorVictor Stinner <vstinner@python.org>
Fri, 22 Sep 2023 14:34:53 +0000 (16:34 +0200)
committerGitHub <noreply@github.com>
Fri, 22 Sep 2023 14:34:53 +0000 (14:34 +0000)
gh-109709: Fix asyncio test_stdin_broken_pipe() (#109710)

Replace harcoded sleep of 500 ms with synchronization using a pipe.

Fix also Process._feed_stdin(): catch also BrokenPipeError on
stdin.write(input), not only on stdin.drain().

(cherry picked from commit cbbdf2c1440c804adcfc32ea0470865b3b3b8eb2)

Lib/asyncio/subprocess.py
Lib/test/test_asyncio/test_subprocess.py

index c380bbb0ee93ece71198d7e93dcd2d030090eb14..da4f00a4a07fc834a1e73274748c375219b19db3 100644 (file)
@@ -147,14 +147,16 @@ class Process:
 
     async def _feed_stdin(self, input):
         debug = self._loop.get_debug()
-        self.stdin.write(input)
-        if debug:
-            logger.debug(
-                '%r communicate: feed stdin (%s bytes)', self, len(input))
         try:
+            self.stdin.write(input)
+            if debug:
+                logger.debug(
+                    '%r communicate: feed stdin (%s bytes)', self, len(input))
+
             await self.stdin.drain()
         except (BrokenPipeError, ConnectionResetError) as exc:
-            # communicate() ignores BrokenPipeError and ConnectionResetError
+            # communicate() ignores BrokenPipeError and ConnectionResetError.
+            # write() and drain() can raise these exceptions.
             if debug:
                 logger.debug('%r communicate: stdin got %r', self, exc)
 
index bea2314a528663e110e18f3bf5a05a638a97514e..8b4f14eb48de650fad98442ed0497aac5fbd314c 100644 (file)
@@ -2,6 +2,7 @@ import os
 import shutil
 import signal
 import sys
+import textwrap
 import unittest
 import warnings
 from unittest import mock
@@ -13,9 +14,14 @@ from test.test_asyncio import utils as test_utils
 from test import support
 from test.support import os_helper
 
-if sys.platform != 'win32':
+
+MS_WINDOWS = (sys.platform == 'win32')
+if MS_WINDOWS:
+    import msvcrt
+else:
     from asyncio import unix_events
 
+
 if support.check_sanitizer(address=True):
     raise unittest.SkipTest("Exposes ASAN flakiness in GitHub CI")
 
@@ -253,26 +259,43 @@ class SubprocessMixin:
         finally:
             signal.signal(signal.SIGHUP, old_handler)
 
-    def prepare_broken_pipe_test(self):
+    def test_stdin_broken_pipe(self):
         # buffer large enough to feed the whole pipe buffer
         large_data = b'x' * support.PIPE_MAX_SIZE
 
+        rfd, wfd = os.pipe()
+        self.addCleanup(os.close, rfd)
+        self.addCleanup(os.close, wfd)
+        if MS_WINDOWS:
+            handle = msvcrt.get_osfhandle(rfd)
+            os.set_handle_inheritable(handle, True)
+            code = textwrap.dedent(f'''
+                import os, msvcrt
+                handle = {handle}
+                fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
+                os.read(fd, 1)
+            ''')
+            from subprocess import STARTUPINFO
+            startupinfo = STARTUPINFO()
+            startupinfo.lpAttributeList = {"handle_list": [handle]}
+            kwargs = dict(startupinfo=startupinfo)
+        else:
+            code = f'import os; fd = {rfd}; os.read(fd, 1)'
+            kwargs = dict(pass_fds=(rfd,))
+
         # the program ends before the stdin can be fed
         proc = self.loop.run_until_complete(
             asyncio.create_subprocess_exec(
-                sys.executable, '-c', 'pass',
+                sys.executable, '-c', code,
                 stdin=subprocess.PIPE,
+                **kwargs
             )
         )
 
-        return (proc, large_data)
-
-    def test_stdin_broken_pipe(self):
-        proc, large_data = self.prepare_broken_pipe_test()
-
         async def write_stdin(proc, data):
-            await asyncio.sleep(0.5)
             proc.stdin.write(data)
+            # Only exit the child process once the write buffer is filled
+            os.write(wfd, b'go')
             await proc.stdin.drain()
 
         coro = write_stdin(proc, large_data)
@@ -283,7 +306,16 @@ class SubprocessMixin:
         self.loop.run_until_complete(proc.wait())
 
     def test_communicate_ignore_broken_pipe(self):
-        proc, large_data = self.prepare_broken_pipe_test()
+        # buffer large enough to feed the whole pipe buffer
+        large_data = b'x' * support.PIPE_MAX_SIZE
+
+        # the program ends before the stdin can be fed
+        proc = self.loop.run_until_complete(
+            asyncio.create_subprocess_exec(
+                sys.executable, '-c', 'pass',
+                stdin=subprocess.PIPE,
+            )
+        )
 
         # communicate() must ignore BrokenPipeError when feeding stdin
         self.loop.set_exception_handler(lambda loop, msg: None)