else:
_PopenSelector = selectors.SelectSelector
+ def _communicate_io_posix(selector, stdin, input_view, input_offset,
+ output_buffers, endtime, *, close_on_eof=False):
+ """
+ Low-level POSIX I/O multiplexing loop used by Popen._communicate.
+
+ Handles the select loop for reading/writing but does not manage
+ stream lifecycle or raise timeout exceptions.
+
+ Args:
+ selector: A _PopenSelector with streams already registered
+ stdin: Writable file object for input, or None
+ input_view: memoryview of input bytes, or None
+ input_offset: Starting offset into input_view (for resume support)
+ output_buffers: Dict {file_object: list} to append read chunks to
+ endtime: Deadline timestamp, or None for no timeout
+ close_on_eof: If True, close output streams immediately when they
+ EOF rather than leaving them open for the caller to close.
+ Used by Popen._communicate() to match its historical behavior
+ of releasing fds as soon as the child closes the corresponding
+ pipe.
+
+ Returns:
+ (new_input_offset, completed)
+ - new_input_offset: How many bytes of input were written
+ - completed: True if all I/O finished, False if timed out
+
+ Note:
+ - Closes output streams on EOF only if close_on_eof=True
+ - Does NOT raise TimeoutExpired (caller handles)
+ - Appends to output_buffers lists in place
+ """
+ stdin_fd = stdin.fileno() if stdin else None
+
+ while selector.get_map():
+ remaining = _deadline_remaining(endtime)
+ if remaining is not None and remaining <= 0:
+ return (input_offset, False) # Timed out
+
+ ready = selector.select(remaining)
+
+ # Check timeout after select (may have woken spuriously)
+ if endtime is not None and _time() > endtime:
+ return (input_offset, False) # Timed out
+
+ for key, events in ready:
+ if key.fd == stdin_fd:
+ chunk = input_view[input_offset:input_offset + _PIPE_BUF]
+ try:
+ input_offset += os.write(key.fd, chunk)
+ except BrokenPipeError:
+ selector.unregister(key.fd)
+ try:
+ stdin.close()
+ except BrokenPipeError:
+ pass
+ else:
+ if input_offset >= len(input_view):
+ selector.unregister(key.fd)
+ try:
+ stdin.close()
+ except BrokenPipeError:
+ pass
+ elif key.fileobj in output_buffers:
+ data = os.read(key.fd, 32768)
+ if not data:
+ selector.unregister(key.fileobj)
+ if close_on_eof:
+ try:
+ key.fileobj.close()
+ except OSError:
+ pass
+ else:
+ output_buffers[key.fileobj].append(data)
+
+ return (input_offset, True) # Completed
+
if _mswindows:
# On Windows we just need to close `Popen._handle` when we no longer need
DEVNULL = -3
+def _deadline_remaining(endtime):
+ """Calculate remaining time until deadline."""
+ if endtime is None:
+ return None
+ return endtime - _time()
+
+
+def _flush_stdin(stdin):
+ """Flush stdin, ignoring BrokenPipeError and closed file ValueError."""
+ try:
+ stdin.flush()
+ except BrokenPipeError:
+ pass # communicate() must ignore BrokenPipeError.
+ except ValueError:
+ # Ignore ValueError: I/O operation on closed file.
+ if not stdin.closed:
+ raise
+
+
+def _make_input_view(input_data):
+ """Convert input data to a byte memoryview for writing.
+
+ Handles the case where input_data is already a memoryview with
+ non-byte elements (e.g., int32 array) by casting to a byte view.
+ This ensures len(view) returns the byte count, not element count.
+ """
+ if not input_data:
+ return None
+ if isinstance(input_data, memoryview):
+ return input_data.cast("b") # ensure byte view for correct len()
+ return memoryview(input_data)
+
+
+def _translate_newlines(data, encoding, errors):
+ """Decode bytes to str and translate newlines to \n."""
+ data = data.decode(encoding, errors)
+ return data.replace("\r\n", "\n").replace("\r", "\n")
+
+
# XXX This function is only used by multiprocessing and the test suite,
# but it's here so that it can be imported when Python is compiled without
# threads.
self.text_mode = bool(universal_newlines)
def _translate_newlines(self, data, encoding, errors):
- data = data.decode(encoding, errors)
- return data.replace("\r\n", "\n").replace("\r", "\n")
+ # Subclass-overridable hook; defers to the module-level helper.
+ return _translate_newlines(data, encoding, errors)
def __enter__(self):
return self
# See the detailed comment in .wait().
if timeout is not None:
sigint_timeout = min(self._sigint_wait_secs,
- self._remaining_time(endtime))
+ _deadline_remaining(endtime))
else:
sigint_timeout = self._sigint_wait_secs
self._sigint_wait_secs = 0 # nothing else should wait.
finally:
self._communication_started = True
try:
- self.wait(timeout=self._remaining_time(endtime))
+ self.wait(timeout=_deadline_remaining(endtime))
except TimeoutExpired as exc:
exc.timeout = timeout
raise
return self._internal_poll()
- def _remaining_time(self, endtime):
- """Convenience for _communicate when computing timeouts."""
- if endtime is None:
- return None
- else:
- return endtime - _time()
-
-
def _check_timeout(self, endtime, orig_timeout, stdout_seq, stderr_seq,
skip_check_and_raise=False):
"""Convenience for checking if a timeout has expired."""
# generated SIGINT and will exit rapidly.
if timeout is not None:
sigint_timeout = min(self._sigint_wait_secs,
- self._remaining_time(endtime))
+ _deadline_remaining(endtime))
else:
sigint_timeout = self._sigint_wait_secs
self._sigint_wait_secs = 0 # nothing else should wait.
# thread remains writing and the fd left open in case the user
# calls communicate again.
if hasattr(self, "_stdin_thread"):
- self._stdin_thread.join(self._remaining_time(endtime))
+ self._stdin_thread.join(_deadline_remaining(endtime))
if self._stdin_thread.is_alive():
raise TimeoutExpired(self.args, orig_timeout)
# threads remain reading and the fds left open in case the user
# calls communicate again.
if self.stdout is not None:
- self.stdout_thread.join(self._remaining_time(endtime))
+ self.stdout_thread.join(_deadline_remaining(endtime))
if self.stdout_thread.is_alive():
raise TimeoutExpired(self.args, orig_timeout)
if self.stderr is not None:
- self.stderr_thread.join(self._remaining_time(endtime))
+ self.stderr_thread.join(_deadline_remaining(endtime))
if self.stderr_thread.is_alive():
raise TimeoutExpired(self.args, orig_timeout)
break
finally:
self._waitpid_lock.release()
- remaining = self._remaining_time(endtime)
+ remaining = _deadline_remaining(endtime)
if remaining <= 0:
raise TimeoutExpired(self.args, timeout)
delay = min(delay * 2, remaining, .05)
if self.stdin and not self._communication_started:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
- try:
- self.stdin.flush()
- except BrokenPipeError:
- pass # communicate() must ignore BrokenPipeError.
- except ValueError:
- # ignore ValueError: I/O operation on closed file.
- if not self.stdin.closed:
- raise
+ _flush_stdin(self.stdin)
if not input:
try:
self.stdin.close()
self._save_input(input)
- if self._input:
- if not isinstance(self._input, memoryview):
- input_view = memoryview(self._input)
- else:
- input_view = self._input.cast("b") # byte input required
+ input_view = _make_input_view(self._input)
+ input_offset = self._input_offset if self._input else 0
with _PopenSelector() as selector:
if self.stdin and not self.stdin.closed and self._input:
if self.stderr and not self.stderr.closed:
selector.register(self.stderr, selectors.EVENT_READ)
- while selector.get_map():
- timeout = self._remaining_time(endtime)
- if timeout is not None and timeout <= 0:
- self._check_timeout(endtime, orig_timeout,
- stdout, stderr,
- skip_check_and_raise=True)
- raise RuntimeError( # Impossible :)
- '_check_timeout(..., skip_check_and_raise=True) '
- 'failed to raise TimeoutExpired.')
-
- ready = selector.select(timeout)
- self._check_timeout(endtime, orig_timeout, stdout, stderr)
-
- # XXX Rewrite these to use non-blocking I/O on the file
- # objects; they are no longer using C stdio!
-
- for key, events in ready:
- if key.fileobj is self.stdin:
- chunk = input_view[self._input_offset :
- self._input_offset + _PIPE_BUF]
- try:
- self._input_offset += os.write(key.fd, chunk)
- except BrokenPipeError:
- selector.unregister(key.fileobj)
- key.fileobj.close()
- else:
- if self._input_offset >= len(input_view):
- selector.unregister(key.fileobj)
- key.fileobj.close()
- elif key.fileobj in (self.stdout, self.stderr):
- data = os.read(key.fd, 32768)
- if not data:
- selector.unregister(key.fileobj)
- key.fileobj.close()
- self._fileobj2output[key.fileobj].append(data)
+ stdin_to_write = (self.stdin if self.stdin and self._input
+ and not self.stdin.closed else None)
+ # Persist the returned offset on self so a subsequent
+ # communicate() after a TimeoutExpired resumes mid-input
+ # rather than re-sending bytes the child already consumed.
+ new_offset, completed = _communicate_io_posix(
+ selector,
+ stdin_to_write,
+ input_view,
+ input_offset,
+ self._fileobj2output,
+ endtime,
+ close_on_eof=True)
+ if self._input:
+ self._input_offset = new_offset
+
+ if not completed:
+ self._check_timeout(endtime, orig_timeout, stdout, stderr,
+ skip_check_and_raise=True)
+ raise RuntimeError( # Impossible :)
+ '_check_timeout(..., skip_check_and_raise=True) '
+ 'failed to raise TimeoutExpired.')
+
try:
- self.wait(timeout=self._remaining_time(endtime))
+ self.wait(timeout=_deadline_remaining(endtime))
except TimeoutExpired as exc:
exc.timeout = orig_timeout
raise