]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
asyncio: sync with Tulip
authorVictor Stinner <victor.stinner@gmail.com>
Thu, 29 Jan 2015 23:05:19 +0000 (00:05 +0100)
committerVictor Stinner <victor.stinner@gmail.com>
Thu, 29 Jan 2015 23:05:19 +0000 (00:05 +0100)
Issue #23347: send_signal(), kill() and terminate() methods of
BaseSubprocessTransport now check if the transport was closed and if the
process exited.

Issue #23347: Refactor creation of subprocess transports. Changes on
BaseSubprocessTransport:

* Add a wait() method to wait until the child process exit
* The constructor now accepts an optional waiter parameter. The _post_init()
  coroutine must not be called explicitly anymore. It makes subprocess
  transports closer to other transports, and it gives more freedom if we want
  later to change completly how subprocess transports are created.
* close() now kills the process instead of kindly terminate it: the child
  process may ignore SIGTERM and continue to run. Call explicitly terminate()
  and wait() if you want to kindly terminate the child process.
* close() now logs a warning in debug mode if the process is still running and
  needs to be killed
* _make_subprocess_transport() is now fully asynchronous again: if the creation
  of the transport failed, wait asynchronously for the process eixt. Before the
  wait was synchronous. This change requires close() to *kill*, and not
  terminate, the child process.
* Remove the _kill_wait() method, replaced with a more agressive close()
  method. It fixes _make_subprocess_transport() on error.
  BaseSubprocessTransport.close() calls the close() method of pipe transports,
  whereas _kill_wait() closed directly pipes of the subprocess.Popen object
  without unregistering file descriptors from the selector (which caused severe
  bugs).

These changes simplifies the code of subprocess.py.

Lib/asyncio/base_subprocess.py
Lib/asyncio/subprocess.py
Lib/asyncio/unix_events.py
Lib/asyncio/windows_events.py
Lib/test/test_asyncio/test_events.py
Lib/test/test_asyncio/test_subprocess.py

index 651a9a291ee174d4e2cac74d6996c08187373013..001f9b8c242a46a9495be0d03208ff8c296a15d7 100644 (file)
@@ -3,6 +3,7 @@ import subprocess
 import sys
 import warnings
 
+from . import futures
 from . import protocols
 from . import transports
 from .coroutines import coroutine
@@ -13,27 +14,32 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
 
     def __init__(self, loop, protocol, args, shell,
                  stdin, stdout, stderr, bufsize,
-                 extra=None, **kwargs):
+                 waiter=None, extra=None, **kwargs):
         super().__init__(extra)
         self._closed = False
         self._protocol = protocol
         self._loop = loop
+        self._proc = None
         self._pid = None
-
+        self._returncode = None
+        self._exit_waiters = []
+        self._pending_calls = collections.deque()
         self._pipes = {}
+        self._finished = False
+
         if stdin == subprocess.PIPE:
             self._pipes[0] = None
         if stdout == subprocess.PIPE:
             self._pipes[1] = None
         if stderr == subprocess.PIPE:
             self._pipes[2] = None
-        self._pending_calls = collections.deque()
-        self._finished = False
-        self._returncode = None
+
+        # Create the child process: set the _proc attribute
         self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
                     stderr=stderr, bufsize=bufsize, **kwargs)
         self._pid = self._proc.pid
         self._extra['subprocess'] = self._proc
+
         if self._loop.get_debug():
             if isinstance(args, (bytes, str)):
                 program = args
@@ -42,6 +48,8 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
             logger.debug('process %r created: pid %s',
                          program, self._pid)
 
+        self._loop.create_task(self._connect_pipes(waiter))
+
     def __repr__(self):
         info = [self.__class__.__name__]
         if self._closed:
@@ -77,12 +85,23 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
 
     def close(self):
         self._closed = True
+
         for proto in self._pipes.values():
             if proto is None:
                 continue
             proto.pipe.close()
-        if self._returncode is None:
-            self.terminate()
+
+        if self._proc is not None and self._returncode is None:
+            if self._loop.get_debug():
+                logger.warning('Close running child process: kill %r', self)
+
+            try:
+                self._proc.kill()
+            except ProcessLookupError:
+                pass
+
+            # Don't clear the _proc reference yet because _post_init() may
+            # still run
 
     # On Python 3.3 and older, objects with a destructor part of a reference
     # cycle are never destroyed. It's not more the case on Python 3.4 thanks
@@ -105,59 +124,42 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
         else:
             return None
 
+    def _check_proc(self):
+        if self._closed:
+            raise ValueError("operation on closed transport")
+        if self._proc is None:
+            raise ProcessLookupError()
+
     def send_signal(self, signal):
+        self._check_proc()
         self._proc.send_signal(signal)
 
     def terminate(self):
+        self._check_proc()
         self._proc.terminate()
 
     def kill(self):
+        self._check_proc()
         self._proc.kill()
 
-    def _kill_wait(self):
-        """Close pipes, kill the subprocess and read its return status.
-
-        Function called when an exception is raised during the creation
-        of a subprocess.
-        """
-        self._closed = True
-        if self._loop.get_debug():
-            logger.warning('Exception during subprocess creation, '
-                           'kill the subprocess %r',
-                           self,
-                           exc_info=True)
-
-        proc = self._proc
-        if proc.stdout:
-            proc.stdout.close()
-        if proc.stderr:
-            proc.stderr.close()
-        if proc.stdin:
-            proc.stdin.close()
-
-        try:
-            proc.kill()
-        except ProcessLookupError:
-            pass
-        self._returncode = proc.wait()
-
-        self.close()
-
     @coroutine
-    def _post_init(self):
+    def _connect_pipes(self, waiter):
         try:
             proc = self._proc
             loop = self._loop
+
             if proc.stdin is not None:
                 _, pipe = yield from loop.connect_write_pipe(
                     lambda: WriteSubprocessPipeProto(self, 0),
                     proc.stdin)
                 self._pipes[0] = pipe
+
             if proc.stdout is not None:
                 _, pipe = yield from loop.connect_read_pipe(
                     lambda: ReadSubprocessPipeProto(self, 1),
                     proc.stdout)
                 self._pipes[1] = pipe
+
             if proc.stderr is not None:
                 _, pipe = yield from loop.connect_read_pipe(
                     lambda: ReadSubprocessPipeProto(self, 2),
@@ -166,13 +168,16 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
 
             assert self._pending_calls is not None
 
-            self._loop.call_soon(self._protocol.connection_made, self)
+            loop.call_soon(self._protocol.connection_made, self)
             for callback, data in self._pending_calls:
-                self._loop.call_soon(callback, *data)
+                loop.call_soon(callback, *data)
             self._pending_calls = None
-        except:
-            self._kill_wait()
-            raise
+        except Exception as exc:
+            if waiter is not None and not waiter.cancelled():
+                waiter.set_exception(exc)
+        else:
+            if waiter is not None and not waiter.cancelled():
+                waiter.set_result(None)
 
     def _call(self, cb, *data):
         if self._pending_calls is not None:
@@ -197,6 +202,23 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
         self._call(self._protocol.process_exited)
         self._try_finish()
 
+        # wake up futures waiting for wait()
+        for waiter in self._exit_waiters:
+            if not waiter.cancelled():
+                waiter.set_result(returncode)
+        self._exit_waiters = None
+
+    def wait(self):
+        """Wait until the process exit and return the process return code.
+
+        This method is a coroutine."""
+        if self._returncode is not None:
+            return self._returncode
+
+        waiter = futures.Future(loop=self._loop)
+        self._exit_waiters.append(waiter)
+        return (yield from waiter)
+
     def _try_finish(self):
         assert not self._finished
         if self._returncode is None:
@@ -210,9 +232,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
         try:
             self._protocol.connection_lost(exc)
         finally:
+            self._loop = None
             self._proc = None
             self._protocol = None
-            self._loop = None
 
 
 class WriteSubprocessPipeProto(protocols.BaseProtocol):
index c848a21a8f26d229909a490c1f0c242322bb52f6..d0c9779c1c92a1a56f397abfb9ad6b95c8ddcb16 100644 (file)
@@ -25,8 +25,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
         super().__init__(loop=loop)
         self._limit = limit
         self.stdin = self.stdout = self.stderr = None
-        self.waiter = futures.Future(loop=loop)
-        self._waiters = collections.deque()
         self._transport = None
 
     def __repr__(self):
@@ -61,9 +59,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
                                               reader=None,
                                               loop=self._loop)
 
-        if not self.waiter.cancelled():
-            self.waiter.set_result(None)
-
     def pipe_data_received(self, fd, data):
         if fd == 1:
             reader = self.stdout
@@ -94,16 +89,9 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
                 reader.set_exception(exc)
 
     def process_exited(self):
-        returncode = self._transport.get_returncode()
         self._transport.close()
         self._transport = None
 
-        # wake up futures waiting for wait()
-        while self._waiters:
-            waiter = self._waiters.popleft()
-            if not waiter.cancelled():
-                waiter.set_result(returncode)
-
 
 class Process:
     def __init__(self, transport, protocol, loop):
@@ -124,30 +112,18 @@ class Process:
 
     @coroutine
     def wait(self):
-        """Wait until the process exit and return the process return code."""
-        returncode = self._transport.get_returncode()
-        if returncode is not None:
-            return returncode
-
-        waiter = futures.Future(loop=self._loop)
-        self._protocol._waiters.append(waiter)
-        yield from waiter
-        return waiter.result()
+        """Wait until the process exit and return the process return code.
 
-    def _check_alive(self):
-        if self._transport.get_returncode() is not None:
-            raise ProcessLookupError()
+        This method is a coroutine."""
+        return (yield from self._transport.wait())
 
     def send_signal(self, signal):
-        self._check_alive()
         self._transport.send_signal(signal)
 
     def terminate(self):
-        self._check_alive()
         self._transport.terminate()
 
     def kill(self):
-        self._check_alive()
         self._transport.kill()
 
     @coroutine
@@ -221,11 +197,6 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
                                             protocol_factory,
                                             cmd, stdin=stdin, stdout=stdout,
                                             stderr=stderr, **kwds)
-    try:
-        yield from protocol.waiter
-    except:
-        transport._kill_wait()
-        raise
     return Process(transport, protocol, loop)
 
 @coroutine
@@ -241,9 +212,4 @@ def create_subprocess_exec(program, *args, stdin=None, stdout=None,
                                             program, *args,
                                             stdin=stdin, stdout=stdout,
                                             stderr=stderr, **kwds)
-    try:
-        yield from protocol.waiter
-    except:
-        transport._kill_wait()
-        raise
     return Process(transport, protocol, loop)
index b06f1b2330d76d5e87c7f6c361b5d81d1d6d38fe..3ecdfd2e0b18cd89dbf39eab8d5a034790a4f60c 100644 (file)
@@ -16,6 +16,7 @@ from . import base_subprocess
 from . import constants
 from . import coroutines
 from . import events
+from . import futures
 from . import selector_events
 from . import selectors
 from . import transports
@@ -175,16 +176,20 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
                                    stdin, stdout, stderr, bufsize,
                                    extra=None, **kwargs):
         with events.get_child_watcher() as watcher:
+            waiter = futures.Future(loop=self)
             transp = _UnixSubprocessTransport(self, protocol, args, shell,
                                               stdin, stdout, stderr, bufsize,
-                                              extra=extra, **kwargs)
+                                              waiter=waiter, extra=extra,
+                                              **kwargs)
+
+            watcher.add_child_handler(transp.get_pid(),
+                                      self._child_watcher_callback, transp)
             try:
-                yield from transp._post_init()
+                yield from waiter
             except:
                 transp.close()
+                yield from transp.wait()
                 raise
-            watcher.add_child_handler(transp.get_pid(),
-                                      self._child_watcher_callback, transp)
 
         return transp
 
@@ -774,7 +779,7 @@ class SafeChildWatcher(BaseChildWatcher):
         pass
 
     def add_child_handler(self, pid, callback, *args):
-        self._callbacks[pid] = callback, args
+        self._callbacks[pid] = (callback, args)
 
         # Prevent a race condition in case the child is already terminated.
         self._do_waitpid(pid)
index 94aafb6f5ab0b16cb3b04b1a9fd7a98ea732302d..437eb0ac9dd7fc11b3c9024aaae6438af03e7b7c 100644 (file)
@@ -366,13 +366,16 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
     def _make_subprocess_transport(self, protocol, args, shell,
                                    stdin, stdout, stderr, bufsize,
                                    extra=None, **kwargs):
+        waiter = futures.Future(loop=self)
         transp = _WindowsSubprocessTransport(self, protocol, args, shell,
                                              stdin, stdout, stderr, bufsize,
-                                             extra=extra, **kwargs)
+                                             waiter=waiter, extra=extra,
+                                             **kwargs)
         try:
-            yield from transp._post_init()
+            yield from waiter
         except:
             transp.close()
+            yield from transp.wait()
             raise
 
         return transp
index 12af62b20931945cfc45c8656f5494e08e005a13..4b957d8f636b87390b5f1659ca8655a748d5b18d 100644 (file)
@@ -1551,9 +1551,10 @@ class SubprocessTestsMixin:
         stdin = transp.get_pipe_transport(0)
         stdin.write(b'Python The Winner')
         self.loop.run_until_complete(proto.got_data[1].wait())
-        transp.close()
+        with test_utils.disable_logger():
+            transp.close()
         self.loop.run_until_complete(proto.completed)
-        self.check_terminated(proto.returncode)
+        self.check_killed(proto.returncode)
         self.assertEqual(b'Python The Winner', proto.data[1])
 
     def test_subprocess_interactive(self):
@@ -1567,21 +1568,20 @@ class SubprocessTestsMixin:
         self.loop.run_until_complete(proto.connected)
         self.assertEqual('CONNECTED', proto.state)
 
-        try:
-            stdin = transp.get_pipe_transport(0)
-            stdin.write(b'Python ')
-            self.loop.run_until_complete(proto.got_data[1].wait())
-            proto.got_data[1].clear()
-            self.assertEqual(b'Python ', proto.data[1])
-
-            stdin.write(b'The Winner')
-            self.loop.run_until_complete(proto.got_data[1].wait())
-            self.assertEqual(b'Python The Winner', proto.data[1])
-        finally:
-            transp.close()
+        stdin = transp.get_pipe_transport(0)
+        stdin.write(b'Python ')
+        self.loop.run_until_complete(proto.got_data[1].wait())
+        proto.got_data[1].clear()
+        self.assertEqual(b'Python ', proto.data[1])
 
+        stdin.write(b'The Winner')
+        self.loop.run_until_complete(proto.got_data[1].wait())
+        self.assertEqual(b'Python The Winner', proto.data[1])
+
+        with test_utils.disable_logger():
+            transp.close()
         self.loop.run_until_complete(proto.completed)
-        self.check_terminated(proto.returncode)
+        self.check_killed(proto.returncode)
 
     def test_subprocess_shell(self):
         connect = self.loop.subprocess_shell(
@@ -1739,9 +1739,10 @@ class SubprocessTestsMixin:
             # GetLastError()==ERROR_INVALID_NAME on Windows!?!  (Using
             # WriteFile() we get ERROR_BROKEN_PIPE as expected.)
             self.assertEqual(b'ERR:OSError', proto.data[2])
-        transp.close()
+        with test_utils.disable_logger():
+            transp.close()
         self.loop.run_until_complete(proto.completed)
-        self.check_terminated(proto.returncode)
+        self.check_killed(proto.returncode)
 
     def test_subprocess_wait_no_same_group(self):
         # start the new process in a new session
index ecc2c9d8a8588440061cb0bab9b49eb3437002fc..4f197f394a1e5c941f9a029c2ab2da05726bfec9 100644 (file)
@@ -4,6 +4,7 @@ import unittest
 from unittest import mock
 
 import asyncio
+from asyncio import base_subprocess
 from asyncio import subprocess
 from asyncio import test_utils
 try:
@@ -23,6 +24,70 @@ PROGRAM_CAT = [
               'data = sys.stdin.buffer.read()',
               'sys.stdout.buffer.write(data)'))]
 
+class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
+    def _start(self, *args, **kwargs):
+        self._proc = mock.Mock()
+        self._proc.stdin = None
+        self._proc.stdout = None
+        self._proc.stderr = None
+
+
+class SubprocessTransportTests(test_utils.TestCase):
+    def setUp(self):
+        self.loop = self.new_test_loop()
+        self.set_event_loop(self.loop)
+
+
+    def create_transport(self, waiter=None):
+        protocol = mock.Mock()
+        protocol.connection_made._is_coroutine = False
+        protocol.process_exited._is_coroutine = False
+        transport = TestSubprocessTransport(
+                        self.loop, protocol, ['test'], False,
+                        None, None, None, 0, waiter=waiter)
+        return (transport, protocol)
+
+    def test_close(self):
+        waiter = asyncio.Future(loop=self.loop)
+        transport, protocol = self.create_transport(waiter)
+        transport._process_exited(0)
+        transport.close()
+
+        # The loop didn't run yet
+        self.assertFalse(protocol.connection_made.called)
+
+        # methods must raise ProcessLookupError if the transport was closed
+        self.assertRaises(ValueError, transport.send_signal, signal.SIGTERM)
+        self.assertRaises(ValueError, transport.terminate)
+        self.assertRaises(ValueError, transport.kill)
+
+        self.loop.run_until_complete(waiter)
+
+    def test_proc_exited(self):
+        waiter = asyncio.Future(loop=self.loop)
+        transport, protocol = self.create_transport(waiter)
+        transport._process_exited(6)
+        self.loop.run_until_complete(waiter)
+
+        self.assertEqual(transport.get_returncode(), 6)
+
+        self.assertTrue(protocol.connection_made.called)
+        self.assertTrue(protocol.process_exited.called)
+        self.assertTrue(protocol.connection_lost.called)
+        self.assertEqual(protocol.connection_lost.call_args[0], (None,))
+
+        self.assertFalse(transport._closed)
+        self.assertIsNone(transport._loop)
+        self.assertIsNone(transport._proc)
+        self.assertIsNone(transport._protocol)
+
+        # methods must raise ProcessLookupError if the process exited
+        self.assertRaises(ProcessLookupError,
+                          transport.send_signal, signal.SIGTERM)
+        self.assertRaises(ProcessLookupError, transport.terminate)
+        self.assertRaises(ProcessLookupError, transport.kill)
+
+
 class SubprocessMixin:
 
     def test_stdin_stdout(self):