fh.close()
+ def _writerthread(self, input):
+ self._stdin_write(input)
+
+
def _communicate(self, input, endtime, orig_timeout):
# Start reader threads feeding into a list hanging off of this
# object, unless they've already been started.
self.stderr_thread.daemon = True
self.stderr_thread.start()
- if self.stdin:
- self._stdin_write(input)
+ # Start writer thread to send input to stdin, unless already
+ # started. The thread writes input and closes stdin when done,
+ # or continues in the background on timeout.
+ if self.stdin and not hasattr(self, "_stdin_thread"):
+ self._stdin_thread = \
+ threading.Thread(target=self._writerthread,
+ args=(input,))
+ self._stdin_thread.daemon = True
+ self._stdin_thread.start()
+
+ # Wait for the writer thread, or time out. If we time out, the
+ # thread remains writing and the fd left open in case the user
+ # calls communicate again.
+ if hasattr(self, "_stdin_thread"):
+ self._stdin_thread.join(self._remaining_time(endtime))
+ if self._stdin_thread.is_alive():
+ raise TimeoutExpired(self.args, orig_timeout)
# Wait for the reader threads, or time out. If we time out, the
# threads remain reading and the fds left open in case the user
(stdout, _) = p.communicate()
self.assertEqual(len(stdout), 4 * 64 * 1024)
+ def test_communicate_timeout_large_input(self):
+ # Test that timeout is enforced when writing large input to a
+ # slow-to-read subprocess, and that partial input is preserved
+ # for continuation after timeout (gh-141473).
+ #
+ # This is a regression test for Windows matching POSIX behavior.
+ # On POSIX, select() is used to multiplex I/O with timeout checking.
+ # On Windows, stdin writing must also honor the timeout rather than
+ # blocking indefinitely when the pipe buffer fills.
+
+ # Input larger than typical pipe buffer (4-64KB on Windows)
+ input_data = b"x" * (128 * 1024)
+
+ p = subprocess.Popen(
+ [sys.executable, "-c",
+ "import sys, time; "
+ "time.sleep(30); " # Don't read stdin for a long time
+ "sys.stdout.buffer.write(sys.stdin.buffer.read())"],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ try:
+ timeout = 0.2
+ start = time.monotonic()
+ try:
+ p.communicate(input_data, timeout=timeout)
+ # If we get here without TimeoutExpired, the timeout was ignored
+ elapsed = time.monotonic() - start
+ self.fail(
+ f"TimeoutExpired not raised. communicate() completed in "
+ f"{elapsed:.2f}s, but subprocess sleeps for 30s. "
+ "Stdin writing blocked without enforcing timeout.")
+ except subprocess.TimeoutExpired:
+ elapsed = time.monotonic() - start
+
+ # Timeout should occur close to the specified timeout value,
+ # not after waiting for the subprocess to finish sleeping.
+ # Allow generous margin for slow CI, but must be well under
+ # the subprocess sleep time.
+ self.assertLess(elapsed, 5.0,
+ f"TimeoutExpired raised after {elapsed:.2f}s; expected ~{timeout}s. "
+ "Stdin writing blocked without checking timeout.")
+
+ # After timeout, continue communication. The remaining input
+ # should be sent and we should receive all data back.
+ stdout, stderr = p.communicate()
+
+ # Verify all input was eventually received by the subprocess
+ self.assertEqual(len(stdout), len(input_data),
+ f"Expected {len(input_data)} bytes output but got {len(stdout)}")
+ self.assertEqual(stdout, input_data)
+ finally:
+ p.kill()
+ p.wait()
+
# Test for the fd leak reported in http://bugs.python.org/issue2791.
def test_communicate_pipe_fd_leak(self):
for stdin_pipe in (False, True):