]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-47798: Refactor the POSIX subprocess.Popen._communicate selector loop into helpers...
authorGregory P. Smith <68491+gpshead@users.noreply.github.com>
Mon, 27 Apr 2026 00:40:20 +0000 (17:40 -0700)
committerGitHub <noreply@github.com>
Mon, 27 Apr 2026 00:40:20 +0000 (00:40 +0000)
No public API change.  Lift the per-iteration select/read/write loop out of
Popen._communicate (POSIX) into a module-level _communicate_io_posix(), with
small _flush_stdin / _make_input_view / _translate_newlines helpers alongside
it.  Popen._communicate calls the helper and persists the returned input
offset for resume-after-timeout.

Retire the private Popen._remaining_time method in favor of module-level
_deadline_remaining; all call sites (POSIX and Windows) updated.

Defensive behavioural deltas: the stdin and stdout/stderr .close() calls in
the I/O loop now swallow BrokenPipeError / OSError, matching __exit__ and the
no-input path; previously these were bare.

Adds test_communicate_timeout_resume_partial_write to cover _input_offset
bookkeeping across TimeoutExpired/resume.

Lib/subprocess.py
Lib/test/test_subprocess.py

index 7ac2289f535b6d7d535f29764db0321070b5d8b5..38b655f2f7b9d2bce75af6225a5cf27408de6967 100644 (file)
@@ -250,6 +250,82 @@ else:
     else:
         _PopenSelector = selectors.SelectSelector
 
+    def _communicate_io_posix(selector, stdin, input_view, input_offset,
+                              output_buffers, endtime, *, close_on_eof=False):
+        """
+        Low-level POSIX I/O multiplexing loop used by Popen._communicate.
+
+        Handles the select loop for reading/writing but does not manage
+        stream lifecycle or raise timeout exceptions.
+
+        Args:
+            selector: A _PopenSelector with streams already registered
+            stdin: Writable file object for input, or None
+            input_view: memoryview of input bytes, or None
+            input_offset: Starting offset into input_view (for resume support)
+            output_buffers: Dict {file_object: list} to append read chunks to
+            endtime: Deadline timestamp, or None for no timeout
+            close_on_eof: If True, close output streams immediately when they
+                EOF rather than leaving them open for the caller to close.
+                Used by Popen._communicate() to match its historical behavior
+                of releasing fds as soon as the child closes the corresponding
+                pipe.
+
+        Returns:
+            (new_input_offset, completed)
+            - new_input_offset: How many bytes of input were written
+            - completed: True if all I/O finished, False if timed out
+
+        Note:
+            - Closes output streams on EOF only if close_on_eof=True
+            - Does NOT raise TimeoutExpired (caller handles)
+            - Appends to output_buffers lists in place
+        """
+        stdin_fd = stdin.fileno() if stdin else None
+
+        while selector.get_map():
+            remaining = _deadline_remaining(endtime)
+            if remaining is not None and remaining <= 0:
+                return (input_offset, False)  # Timed out
+
+            ready = selector.select(remaining)
+
+            # Check timeout after select (may have woken spuriously)
+            if endtime is not None and _time() > endtime:
+                return (input_offset, False)  # Timed out
+
+            for key, events in ready:
+                if key.fd == stdin_fd:
+                    chunk = input_view[input_offset:input_offset + _PIPE_BUF]
+                    try:
+                        input_offset += os.write(key.fd, chunk)
+                    except BrokenPipeError:
+                        selector.unregister(key.fd)
+                        try:
+                            stdin.close()
+                        except BrokenPipeError:
+                            pass
+                    else:
+                        if input_offset >= len(input_view):
+                            selector.unregister(key.fd)
+                            try:
+                                stdin.close()
+                            except BrokenPipeError:
+                                pass
+                elif key.fileobj in output_buffers:
+                    data = os.read(key.fd, 32768)
+                    if not data:
+                        selector.unregister(key.fileobj)
+                        if close_on_eof:
+                            try:
+                                key.fileobj.close()
+                            except OSError:
+                                pass
+                    else:
+                        output_buffers[key.fileobj].append(data)
+
+        return (input_offset, True)  # Completed
+
 
 if _mswindows:
     # On Windows we just need to close `Popen._handle` when we no longer need
@@ -289,6 +365,45 @@ STDOUT = -2
 DEVNULL = -3
 
 
+def _deadline_remaining(endtime):
+    """Calculate remaining time until deadline."""
+    if endtime is None:
+        return None
+    return endtime - _time()
+
+
+def _flush_stdin(stdin):
+    """Flush stdin, ignoring BrokenPipeError and closed file ValueError."""
+    try:
+        stdin.flush()
+    except BrokenPipeError:
+        pass  # communicate() must ignore BrokenPipeError.
+    except ValueError:
+        # Ignore ValueError: I/O operation on closed file.
+        if not stdin.closed:
+            raise
+
+
+def _make_input_view(input_data):
+    """Convert input data to a byte memoryview for writing.
+
+    Handles the case where input_data is already a memoryview with
+    non-byte elements (e.g., int32 array) by casting to a byte view.
+    This ensures len(view) returns the byte count, not element count.
+    """
+    if not input_data:
+        return None
+    if isinstance(input_data, memoryview):
+        return input_data.cast("b")  # ensure byte view for correct len()
+    return memoryview(input_data)
+
+
+def _translate_newlines(data, encoding, errors):
+    """Decode bytes to str and translate newlines to \n."""
+    data = data.decode(encoding, errors)
+    return data.replace("\r\n", "\n").replace("\r", "\n")
+
+
 # XXX This function is only used by multiprocessing and the test suite,
 # but it's here so that it can be imported when Python is compiled without
 # threads.
@@ -1149,8 +1264,8 @@ class Popen:
         self.text_mode = bool(universal_newlines)
 
     def _translate_newlines(self, data, encoding, errors):
-        data = data.decode(encoding, errors)
-        return data.replace("\r\n", "\n").replace("\r", "\n")
+        # Subclass-overridable hook; defers to the module-level helper.
+        return _translate_newlines(data, encoding, errors)
 
     def __enter__(self):
         return self
@@ -1277,7 +1392,7 @@ class Popen:
                 # See the detailed comment in .wait().
                 if timeout is not None:
                     sigint_timeout = min(self._sigint_wait_secs,
-                                         self._remaining_time(endtime))
+                                         _deadline_remaining(endtime))
                 else:
                     sigint_timeout = self._sigint_wait_secs
                 self._sigint_wait_secs = 0  # nothing else should wait.
@@ -1290,7 +1405,7 @@ class Popen:
             finally:
                 self._communication_started = True
             try:
-                self.wait(timeout=self._remaining_time(endtime))
+                self.wait(timeout=_deadline_remaining(endtime))
             except TimeoutExpired as exc:
                 exc.timeout = timeout
                 raise
@@ -1304,14 +1419,6 @@ class Popen:
         return self._internal_poll()
 
 
-    def _remaining_time(self, endtime):
-        """Convenience for _communicate when computing timeouts."""
-        if endtime is None:
-            return None
-        else:
-            return endtime - _time()
-
-
     def _check_timeout(self, endtime, orig_timeout, stdout_seq, stderr_seq,
                        skip_check_and_raise=False):
         """Convenience for checking if a timeout has expired."""
@@ -1337,7 +1444,7 @@ class Popen:
             # generated SIGINT and will exit rapidly.
             if timeout is not None:
                 sigint_timeout = min(self._sigint_wait_secs,
-                                     self._remaining_time(endtime))
+                                     _deadline_remaining(endtime))
             else:
                 sigint_timeout = self._sigint_wait_secs
             self._sigint_wait_secs = 0  # nothing else should wait.
@@ -1704,7 +1811,7 @@ class Popen:
             # thread remains writing and the fd left open in case the user
             # calls communicate again.
             if hasattr(self, "_stdin_thread"):
-                self._stdin_thread.join(self._remaining_time(endtime))
+                self._stdin_thread.join(_deadline_remaining(endtime))
                 if self._stdin_thread.is_alive():
                     raise TimeoutExpired(self.args, orig_timeout)
 
@@ -1712,11 +1819,11 @@ class Popen:
             # threads remain reading and the fds left open in case the user
             # calls communicate again.
             if self.stdout is not None:
-                self.stdout_thread.join(self._remaining_time(endtime))
+                self.stdout_thread.join(_deadline_remaining(endtime))
                 if self.stdout_thread.is_alive():
                     raise TimeoutExpired(self.args, orig_timeout)
             if self.stderr is not None:
-                self.stderr_thread.join(self._remaining_time(endtime))
+                self.stderr_thread.join(_deadline_remaining(endtime))
                 if self.stderr_thread.is_alive():
                     raise TimeoutExpired(self.args, orig_timeout)
 
@@ -2210,7 +2317,7 @@ class Popen:
                                 break
                         finally:
                             self._waitpid_lock.release()
-                    remaining = self._remaining_time(endtime)
+                    remaining = _deadline_remaining(endtime)
                     if remaining <= 0:
                         raise TimeoutExpired(self.args, timeout)
                     delay = min(delay * 2, remaining, .05)
@@ -2234,14 +2341,7 @@ class Popen:
             if self.stdin and not self._communication_started:
                 # Flush stdio buffer.  This might block, if the user has
                 # been writing to .stdin in an uncontrolled fashion.
-                try:
-                    self.stdin.flush()
-                except BrokenPipeError:
-                    pass  # communicate() must ignore BrokenPipeError.
-                except ValueError:
-                    # ignore ValueError: I/O operation on closed file.
-                    if not self.stdin.closed:
-                        raise
+                _flush_stdin(self.stdin)
                 if not input:
                     try:
                         self.stdin.close()
@@ -2266,11 +2366,8 @@ class Popen:
 
             self._save_input(input)
 
-            if self._input:
-                if not isinstance(self._input, memoryview):
-                    input_view = memoryview(self._input)
-                else:
-                    input_view = self._input.cast("b")  # byte input required
+            input_view = _make_input_view(self._input)
+            input_offset = self._input_offset if self._input else 0
 
             with _PopenSelector() as selector:
                 if self.stdin and not self.stdin.closed and self._input:
@@ -2280,43 +2377,31 @@ class Popen:
                 if self.stderr and not self.stderr.closed:
                     selector.register(self.stderr, selectors.EVENT_READ)
 
-                while selector.get_map():
-                    timeout = self._remaining_time(endtime)
-                    if timeout is not None and timeout <= 0:
-                        self._check_timeout(endtime, orig_timeout,
-                                            stdout, stderr,
-                                            skip_check_and_raise=True)
-                        raise RuntimeError(  # Impossible :)
-                            '_check_timeout(..., skip_check_and_raise=True) '
-                            'failed to raise TimeoutExpired.')
-
-                    ready = selector.select(timeout)
-                    self._check_timeout(endtime, orig_timeout, stdout, stderr)
-
-                    # XXX Rewrite these to use non-blocking I/O on the file
-                    # objects; they are no longer using C stdio!
-
-                    for key, events in ready:
-                        if key.fileobj is self.stdin:
-                            chunk = input_view[self._input_offset :
-                                               self._input_offset + _PIPE_BUF]
-                            try:
-                                self._input_offset += os.write(key.fd, chunk)
-                            except BrokenPipeError:
-                                selector.unregister(key.fileobj)
-                                key.fileobj.close()
-                            else:
-                                if self._input_offset >= len(input_view):
-                                    selector.unregister(key.fileobj)
-                                    key.fileobj.close()
-                        elif key.fileobj in (self.stdout, self.stderr):
-                            data = os.read(key.fd, 32768)
-                            if not data:
-                                selector.unregister(key.fileobj)
-                                key.fileobj.close()
-                            self._fileobj2output[key.fileobj].append(data)
+                stdin_to_write = (self.stdin if self.stdin and self._input
+                                  and not self.stdin.closed else None)
+                # Persist the returned offset on self so a subsequent
+                # communicate() after a TimeoutExpired resumes mid-input
+                # rather than re-sending bytes the child already consumed.
+                new_offset, completed = _communicate_io_posix(
+                    selector,
+                    stdin_to_write,
+                    input_view,
+                    input_offset,
+                    self._fileobj2output,
+                    endtime,
+                    close_on_eof=True)
+                if self._input:
+                    self._input_offset = new_offset
+
+            if not completed:
+                self._check_timeout(endtime, orig_timeout, stdout, stderr,
+                                    skip_check_and_raise=True)
+                raise RuntimeError(  # Impossible :)
+                    '_check_timeout(..., skip_check_and_raise=True) '
+                    'failed to raise TimeoutExpired.')
+
             try:
-                self.wait(timeout=self._remaining_time(endtime))
+                self.wait(timeout=_deadline_remaining(endtime))
             except TimeoutExpired as exc:
                 exc.timeout = orig_timeout
                 raise
index 3237a9cb49876d3ff85d4f29913a2a92c535bc9e..1a3db527d3d5b83667ac5d1876fa6cef0b21c3cc 100644 (file)
@@ -1130,6 +1130,39 @@ class ProcessTestCase(BaseTestCase):
             p.kill()
             p.wait()
 
+    def test_communicate_timeout_resume_partial_write(self):
+        """Resume writing input after a partial-write TimeoutExpired.
+
+        Exercises the _input_offset bookkeeping across the
+        _communicate_io_posix factoring: a first communicate() must time out
+        mid-write, and a subsequent communicate() must finish delivering the
+        remaining bytes so the child receives the full input intact.
+        """
+        # 1 MiB easily exceeds typical pipe buffers (~64 KiB) so writing
+        # blocks once the buffer fills before the child starts reading.
+        input_data = bytes(range(256)) * 4096  # 1 MiB, distinctive pattern
+        self.assertEqual(len(input_data), 1024 * 1024)
+
+        p = subprocess.Popen(
+            [sys.executable, "-c",
+             "import sys, time; "
+             "time.sleep(0.5); "
+             "sys.stdout.buffer.write(sys.stdin.buffer.read())"],
+            stdin=subprocess.PIPE,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE)
+        try:
+            with self.assertRaises(subprocess.TimeoutExpired):
+                p.communicate(input_data, timeout=0.05)
+
+            # Resume: no new input, generous timeout to avoid CI flakes.
+            stdout, stderr = p.communicate(timeout=support.LONG_TIMEOUT)
+            self.assertEqual(len(stdout), len(input_data))
+            self.assertEqual(stdout, input_data)
+        finally:
+            p.kill()
+            p.wait()
+
     # Test for the fd leak reported in http://bugs.python.org/issue2791.
     def test_communicate_pipe_fd_leak(self):
         for stdin_pipe in (False, True):