]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[3.12] gh-108973: Fix asyncio test_subprocess_consistent_callbacks() (GH-109431)...
authorMiss Islington (bot) <31488909+miss-islington@users.noreply.github.com>
Mon, 2 Oct 2023 15:03:51 +0000 (08:03 -0700)
committerGitHub <noreply@github.com>
Mon, 2 Oct 2023 15:03:51 +0000 (17:03 +0200)
gh-108973: Fix asyncio test_subprocess_consistent_callbacks() (GH-109431)

SubprocessProtocol process_exited() method can be called before
pipe_data_received() and pipe_connection_lost() methods. Document it
and adapt the test for that.

Revert commit 282edd7b2a74c4dfe1bfe3c5b1d30f9c21d554d6.
_child_watcher_callback() calls immediately _process_exited(): don't
add an additional delay with call_soon(). The reverted change didn't
make _process_exited() more determistic: it can still be called
before pipe_connection_lost() for example.

(cherry picked from commit ced6924630037f1e5b3d1dbef2b600152fb07fbb)

Co-authored-by: Victor Stinner <vstinner@python.org>
Co-authored-by: Davide Rizzo <sorcio@gmail.com>
Doc/library/asyncio-llapi-index.rst
Doc/library/asyncio-protocol.rst
Lib/asyncio/unix_events.py
Lib/test/test_asyncio/test_subprocess.py

index 9ce48a24444e66d0387cbc73bf1e01b73e54f09c..67136ba69ec875b69c01569d175a52a8ac99cc37 100644 (file)
@@ -484,19 +484,19 @@ Protocol classes can implement the following **callback methods**:
     :widths: 50 50
     :class: full-width-table
 
-    * - ``callback`` :meth:`pipe_data_received()
-        <SubprocessProtocol.pipe_data_received>`
+    * - ``callback`` :meth:`~SubprocessProtocol.pipe_data_received`
       - Called when the child process writes data into its
         *stdout* or *stderr* pipe.
 
-    * - ``callback`` :meth:`pipe_connection_lost()
-        <SubprocessProtocol.pipe_connection_lost>`
+    * - ``callback`` :meth:`~SubprocessProtocol.pipe_connection_lost`
       - Called when one of the pipes communicating with
         the child process is closed.
 
     * - ``callback`` :meth:`process_exited()
         <SubprocessProtocol.process_exited>`
-      - Called when the child process has exited.
+      - Called when the child process has exited. It can be called before
+        :meth:`~SubprocessProtocol.pipe_data_received` and
+        :meth:`~SubprocessProtocol.pipe_connection_lost` methods.
 
 
 Event Loop Policies
index 7bc906eaafc1f26f228b80c81f0a9ceff48ada4d..48fa02937b5237f7b655f99cbe81e1e127f84634 100644 (file)
@@ -708,6 +708,9 @@ factories passed to the :meth:`loop.subprocess_exec` and
 
    Called when the child process has exited.
 
+   It can be called before :meth:`~SubprocessProtocol.pipe_data_received` and
+   :meth:`~SubprocessProtocol.pipe_connection_lost` methods.
+
 
 Examples
 ========
@@ -1003,12 +1006,26 @@ The subprocess is created by the :meth:`loop.subprocess_exec` method::
         def __init__(self, exit_future):
             self.exit_future = exit_future
             self.output = bytearray()
+            self.pipe_closed = False
+            self.exited = False
+
+        def pipe_connection_lost(self, fd, exc):
+            self.pipe_closed = True
+            self.check_for_exit()
 
         def pipe_data_received(self, fd, data):
             self.output.extend(data)
 
         def process_exited(self):
-            self.exit_future.set_result(True)
+            self.exited = True
+            # process_exited() method can be called before
+            # pipe_connection_lost() method: wait until both methods are
+            # called.
+            self.check_for_exit()
+
+        def check_for_exit(self):
+            if self.pipe_closed and self.exited:
+                self.exit_future.set_result(True)
 
     async def get_date():
         # Get a reference to the event loop as we plan to use
index 17fb4d5f7646ced94a1bbf2c4b12025cda3960a8..f985f9776ed0aa8b6854d7b0b5b5921e19f4cb73 100644 (file)
@@ -226,8 +226,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
         return transp
 
     def _child_watcher_callback(self, pid, returncode, transp):
-        # Skip one iteration for callbacks to be executed
-        self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode)
+        self.call_soon_threadsafe(transp._process_exited, returncode)
 
     async def create_unix_connection(
             self, protocol_factory, path=None, *,
index eeeca40c15cd2801ed1f7243bec2b8ff3d439531..429ef16fdb0e0547503c6c82bd2f5aa978a6ab98 100644 (file)
@@ -753,21 +753,44 @@ class SubprocessMixin:
 
         self.loop.run_until_complete(main())
 
-    def test_subprocess_consistent_callbacks(self):
+    def test_subprocess_protocol_events(self):
+        # gh-108973: Test that all subprocess protocol methods are called.
+        # The protocol methods are not called in a determistic order.
+        # The order depends on the event loop and the operating system.
         events = []
+        fds = [1, 2]
+        expected = [
+            ('pipe_data_received', 1, b'stdout'),
+            ('pipe_data_received', 2, b'stderr'),
+            ('pipe_connection_lost', 1),
+            ('pipe_connection_lost', 2),
+            'process_exited',
+        ]
+        per_fd_expected = [
+            'pipe_data_received',
+            'pipe_connection_lost',
+        ]
+
         class MyProtocol(asyncio.SubprocessProtocol):
             def __init__(self, exit_future: asyncio.Future) -> None:
                 self.exit_future = exit_future
 
             def pipe_data_received(self, fd, data) -> None:
                 events.append(('pipe_data_received', fd, data))
+                self.exit_maybe()
 
             def pipe_connection_lost(self, fd, exc) -> None:
-                events.append('pipe_connection_lost')
+                events.append(('pipe_connection_lost', fd))
+                self.exit_maybe()
 
             def process_exited(self) -> None:
                 events.append('process_exited')
-                self.exit_future.set_result(True)
+                self.exit_maybe()
+
+            def exit_maybe(self):
+                # Only exit when we got all expected events
+                if len(events) >= len(expected):
+                    self.exit_future.set_result(True)
 
         async def main() -> None:
             loop = asyncio.get_running_loop()
@@ -777,15 +800,24 @@ class SubprocessMixin:
                                                       sys.executable, '-c', code, stdin=None)
             await exit_future
             transport.close()
-            self.assertEqual(events, [
-                ('pipe_data_received', 1, b'stdout'),
-                ('pipe_data_received', 2, b'stderr'),
-                'pipe_connection_lost',
-                'pipe_connection_lost',
-                'process_exited',
-            ])
 
-        self.loop.run_until_complete(main())
+            return events
+
+        events = self.loop.run_until_complete(main())
+
+        # First, make sure that we received all events
+        self.assertSetEqual(set(events), set(expected))
+
+        # Second, check order of pipe events per file descriptor
+        per_fd_events = {fd: [] for fd in fds}
+        for event in events:
+            if event == 'process_exited':
+                continue
+            name, fd = event[:2]
+            per_fd_events[fd].append(name)
+
+        for fd in fds:
+            self.assertEqual(per_fd_events[fd], per_fd_expected, (fd, events))
 
     def test_subprocess_communicate_stdout(self):
         # See https://github.com/python/cpython/issues/100133