self._pending_calls = collections.deque()
self._pipes = {}
self._finished = False
+ self._pipes_connected = False
if stdin == subprocess.PIPE:
self._pipes[0] = None
else:
if waiter is not None and not waiter.cancelled():
waiter.set_result(None)
+ self._pipes_connected = True
def _call(self, cb, *data):
if self._pending_calls is not None:
assert not self._finished
if self._returncode is None:
return
+ if not self._pipes_connected:
+ # self._pipes_connected can be False if not all pipes were connected
+ # because either the process failed to start or the self._connect_pipes task
+ # got cancelled. In this broken state we consider all pipes disconnected and
+ # to avoid hanging forever in self._wait as otherwise _exit_waiters
+ # would never be woken up, we wake them up here.
+ for waiter in self._exit_waiters:
+ if not waiter.cancelled():
+ waiter.set_result(self._returncode)
if all(p is not None and p.disconnected
for p in self._pipes.values()):
self._finished = True
from asyncio import subprocess
from test.test_asyncio import utils as test_utils
from test import support
-from test.support import os_helper
+from test.support import os_helper, warnings_helper, gc_collect
if not support.has_subprocess_support:
raise unittest.SkipTest("test module requires subprocess")
self.loop.run_until_complete(main())
+ @warnings_helper.ignore_warnings(category=ResourceWarning)
+ def test_subprocess_read_pipe_cancelled(self):
+ async def main():
+ loop = asyncio.get_running_loop()
+ loop.connect_read_pipe = mock.AsyncMock(side_effect=asyncio.CancelledError)
+ with self.assertRaises(asyncio.CancelledError):
+ await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED, stderr=asyncio.subprocess.PIPE)
+
+ asyncio.run(main())
+ gc_collect()
+
+ @warnings_helper.ignore_warnings(category=ResourceWarning)
+ def test_subprocess_write_pipe_cancelled(self):
+ async def main():
+ loop = asyncio.get_running_loop()
+ loop.connect_write_pipe = mock.AsyncMock(side_effect=asyncio.CancelledError)
+ with self.assertRaises(asyncio.CancelledError):
+ await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED, stdin=asyncio.subprocess.PIPE)
+
+ asyncio.run(main())
+ gc_collect()
+
+ @warnings_helper.ignore_warnings(category=ResourceWarning)
+ def test_subprocess_read_write_pipe_cancelled(self):
+ async def main():
+ loop = asyncio.get_running_loop()
+ loop.connect_read_pipe = mock.AsyncMock(side_effect=asyncio.CancelledError)
+ loop.connect_write_pipe = mock.AsyncMock(side_effect=asyncio.CancelledError)
+ with self.assertRaises(asyncio.CancelledError):
+ await asyncio.create_subprocess_exec(
+ *PROGRAM_BLOCKED,
+ stdin=asyncio.subprocess.PIPE,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+
+ asyncio.run(main())
+ gc_collect()
if sys.platform != 'win32':
# Unix