Interact with process:
1. send data to *stdin* (if *input* is not ``None``);
- 2. read data from *stdout* and *stderr*, until EOF is reached;
- 3. wait for process to terminate.
+ 2. closes *stdin*;
+ 3. read data from *stdout* and *stderr*, until EOF is reached;
+ 4. wait for process to terminate.
The optional *input* argument is the data (:class:`bytes` object)
that will be sent to the child process.
Note, that the data read is buffered in memory, so do not use
this method if the data size is large or unlimited.
+ .. versionchanged:: 3.12
+
+ *stdin* gets closed when `input=None` too.
+
.. method:: send_signal(signal)
Sends the signal *signal* to the child process.
async def _feed_stdin(self, input):
debug = self._loop.get_debug()
- self.stdin.write(input)
- if debug:
- logger.debug(
- '%r communicate: feed stdin (%s bytes)', self, len(input))
+ if input is not None:
+ self.stdin.write(input)
+ if debug:
+ logger.debug(
+ '%r communicate: feed stdin (%s bytes)', self, len(input))
try:
await self.stdin.drain()
except (BrokenPipeError, ConnectionResetError) as exc:
return output
async def communicate(self, input=None):
- if input is not None:
+ if self.stdin is not None:
stdin = self._feed_stdin(input)
else:
stdin = self._noop()
self.assertEqual(exitcode, 0)
self.assertEqual(stdout, b'some data')
+ def test_communicate_none_input(self):
+ args = PROGRAM_CAT
+
+ async def run():
+ proc = await asyncio.create_subprocess_exec(
+ *args,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ )
+ stdout, stderr = await proc.communicate()
+ return proc.returncode, stdout
+
+ task = run()
+ task = asyncio.wait_for(task, support.LONG_TIMEOUT)
+ exitcode, stdout = self.loop.run_until_complete(task)
+ self.assertEqual(exitcode, 0)
+ self.assertEqual(stdout, b'')
+
def test_shell(self):
proc = self.loop.run_until_complete(
asyncio.create_subprocess_shell('exit 7')
--- /dev/null
+Make :func:`asyncio.subprocess.Process.communicate` close the subprocess's stdin even when called with ``input=None``.