self._save_input(input)
if self._input:
- input_view = memoryview(self._input)
+ if not isinstance(self._input, memoryview):
+ input_view = memoryview(self._input)
+ else:
+ input_view = self._input.cast("b") # byte input required
with _PopenSelector() as selector:
if self.stdin and not self.stdin.closed and self._input:
selector.unregister(key.fileobj)
key.fileobj.close()
else:
- if self._input_offset >= len(self._input):
+ if self._input_offset >= len(input_view):
selector.unregister(key.fileobj)
key.fileobj.close()
elif key.fileobj in (self.stdout, self.stderr):
self.assertEqual(stdout, b"banana")
self.assertEqual(stderr, b"pineapple")
+ def test_communicate_memoryview_input(self):
+ # Test memoryview input with byte elements
+ test_data = b"Hello, memoryview!"
+ mv = memoryview(test_data)
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; sys.stdout.write(sys.stdin.read())'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE)
+ self.addCleanup(p.stdout.close)
+ self.addCleanup(p.stdin.close)
+ (stdout, stderr) = p.communicate(mv)
+ self.assertEqual(stdout, test_data)
+ self.assertIsNone(stderr)
+
+ def test_communicate_memoryview_input_nonbyte(self):
+ # Test memoryview input with non-byte elements (e.g., int32)
+ # This tests the fix for gh-134453 where non-byte memoryviews
+ # had incorrect length tracking on POSIX
+ import array
+ # Create an array of 32-bit integers that's large enough to trigger
+ # the chunked writing behavior (> PIPE_BUF)
+ pipe_buf = getattr(select, 'PIPE_BUF', 512)
+ # Each 'i' element is 4 bytes, so we need more than pipe_buf/4 elements
+ # Add some extra to ensure we exceed the buffer size
+ num_elements = pipe_buf + 1
+ test_array = array.array('i', [0x64306f66 for _ in range(num_elements)])
+ expected_bytes = test_array.tobytes()
+ mv = memoryview(test_array)
+
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys; '
+ 'data = sys.stdin.buffer.read(); '
+ 'sys.stdout.buffer.write(data)'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE)
+ self.addCleanup(p.stdout.close)
+ self.addCleanup(p.stdin.close)
+ (stdout, stderr) = p.communicate(mv)
+ self.assertEqual(stdout, expected_bytes,
+ msg=f"{len(stdout)=} =? {len(expected_bytes)=}")
+ self.assertIsNone(stderr)
+
def test_communicate_timeout(self):
p = subprocess.Popen([sys.executable, "-c",
'import sys,os,time;'