import sysconfig
import select
import shutil
+import socket
import threading
import gc
import textwrap
# On Windows, stdin writing must also honor the timeout rather than
# blocking indefinitely when the pipe buffer fills.
- # Input larger than typical pipe buffer (4-64KB on Windows)
- input_data = b"x" * (128 * 1024)
+ input_data = b"x" * (128 * 1024) # > typical pipe buffer
+
+ # Cross-platform wake mechanism: the slow reader connects to a
+ # loopback TCP socket and blocks in select() on it (capped at 9s
+ # as a safety net we don't expect to hit). After phase 1 raises
+ # TimeoutExpired, the parent sends a byte to release the child so
+ # it drains stdin. A socket (rather than a raw pipe) is required
+ # because Windows select() only supports sockets, not arbitrary
+ # file descriptors.
+ server = socket.create_server(('127.0.0.1', 0), backlog=1)
+ server.settimeout(10) # bound the accept() if the child fails to start
+ port = server.getsockname()[1]
+ # The child sends one byte (low byte of its PID) first so the parent
+ # can detect the rare case of an unrelated process on the same host
+ # connecting to our ephemeral port before our child does. A single
+ # byte gives 1/256 collision odds, which is plenty for flake-prevention.
+ slow_reader = (
+ "import os, socket, sys, select; "
+ f"s = socket.create_connection(('127.0.0.1', {port}), timeout=9); "
+ "s.sendall(bytes([os.getpid() & 0xff])); "
+ "select.select([s], [], [], 9); "
+ "sys.stdout.buffer.write(sys.stdin.buffer.read())"
+ )
p = subprocess.Popen(
- [sys.executable, "-c",
- "import sys, time; "
- "time.sleep(30); " # Don't read stdin for a long time
- "sys.stdout.buffer.write(sys.stdin.buffer.read())"],
+ [sys.executable, "-c", slow_reader],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
+ conn = None
try:
+ conn, _ = server.accept()
+ server.close()
+ server = None
+
+ conn.settimeout(5)
+ peer_byte = conn.recv(1)
+ conn.settimeout(None)
+ self.assertEqual(peer_byte, bytes([p.pid & 0xff]),
+ f"loopback handshake byte {peer_byte!r} != "
+ f"low byte of child PID {p.pid} ({p.pid & 0xff:#x})")
+
timeout = 0.2
start = time.monotonic()
try:
elapsed = time.monotonic() - start
self.fail(
f"TimeoutExpired not raised. communicate() completed in "
- f"{elapsed:.2f}s, but subprocess sleeps for 30s. "
+ f"{elapsed:.2f}s, but slow reader stalls for up to 9s. "
"Stdin writing blocked without enforcing timeout.")
except subprocess.TimeoutExpired:
elapsed = time.monotonic() - start
# Timeout should occur close to the specified timeout value,
# not after waiting for the subprocess to finish sleeping.
# Allow generous margin for slow CI, but must be well under
- # the subprocess sleep time.
+ # the slow-reader's stall cap.
self.assertLess(elapsed, 5.0,
f"TimeoutExpired raised after {elapsed:.2f}s; expected ~{timeout}s. "
"Stdin writing blocked without checking timeout.")
+ # Release the slow reader so it stops blocking and drains stdin.
+ conn.sendall(b'go')
+ conn.close()
+ conn = None
+
# After timeout, continue communication. The remaining input
# should be sent and we should receive all data back.
stdout, stderr = p.communicate()
f"Expected {len(input_data)} bytes output but got {len(stdout)}")
self.assertEqual(stdout, input_data)
finally:
+ if conn is not None:
+ conn.close()
+ if server is not None:
+ server.close()
p.kill()
p.wait()