]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Add a SIGCHILD handler to Subprocess.
authorBen Darnell <ben@bendarnell.com>
Mon, 17 Sep 2012 06:24:45 +0000 (23:24 -0700)
committerBen Darnell <ben@bendarnell.com>
Mon, 17 Sep 2012 06:24:45 +0000 (23:24 -0700)
tornado/process.py
tornado/test/process_test.py

index 3e9711192a1d13f77a3a6c6908c605805c6515d5..2e8ea329ddadfc5a975c7c120fda4b94d7c789e8 100644 (file)
@@ -19,7 +19,9 @@
 from __future__ import absolute_import, division, with_statement
 
 import errno
+import functools
 import os
+import signal
 import subprocess
 import sys
 import time
@@ -29,6 +31,7 @@ from binascii import hexlify
 from tornado import ioloop
 from tornado.iostream import PipeIOStream
 from tornado.log import gen_log
+from tornado import stack_context
 
 try:
     import multiprocessing  # Python 2.6+
@@ -172,27 +175,104 @@ class Subprocess(object):
     """
     STREAM = object()
 
+    _initialized = False
+    _waiting = {}
+
     def __init__(self, *args, **kwargs):
-        io_loop = kwargs.pop('io_loop', None)
+        self.io_loop = kwargs.pop('io_loop', None)
         to_close = []
         if kwargs.get('stdin') is Subprocess.STREAM:
             in_r, in_w = os.pipe()
             kwargs['stdin'] = in_r
             to_close.append(in_r)
-            self.stdin = PipeIOStream(in_w, io_loop=io_loop)
+            self.stdin = PipeIOStream(in_w, io_loop=self.io_loop)
         if kwargs.get('stdout') is Subprocess.STREAM:
             out_r, out_w = os.pipe()
             kwargs['stdout'] = out_w
             to_close.append(out_w)
-            self.stdout = PipeIOStream(out_r, io_loop=io_loop)
+            self.stdout = PipeIOStream(out_r, io_loop=self.io_loop)
         if kwargs.get('stderr') is Subprocess.STREAM:
             err_r, err_w = os.pipe()
             kwargs['stderr'] = err_w
             to_close.append(err_w)
-            self.stdout = PipeIOStream(err_r, io_loop=io_loop)
+            self.stdout = PipeIOStream(err_r, io_loop=self.io_loop)
         self.proc = subprocess.Popen(*args, **kwargs)
         for fd in to_close:
             os.close(fd)
         for attr in ['stdin', 'stdout', 'stderr', 'pid']:
             if not hasattr(self, attr):  # don't clobber streams set above
                 setattr(self, attr, getattr(self.proc, attr))
+        self._exit_callback = None
+        self.returncode = None
+
+    def set_exit_callback(self, callback):
+        """Runs ``callback`` when this process exits.
+
+        The callback takes one argument, the return code of the process.
+
+        This method uses a ``SIGCHILD`` handler, which is a global setting
+        and may conflict if you have other libraries trying to handle the
+        same signal.  If you are using more than one ``IOLoop`` it may
+        be necessary to call `Subprocess.initialize` first to designate
+        one ``IOLoop`` to run the signal handlers.
+
+        In many cases a close callback on the stdout or stderr streams
+        can be used as an alternative to an exit callback if the
+        signal handler is causing a problem.
+        """
+        self._exit_callback = stack_context.wrap(callback)
+        Subprocess.initialize(self.io_loop)
+        Subprocess._waiting[self.pid] = self
+        Subprocess._try_cleanup_process(self.pid)
+
+    @classmethod
+    def initialize(cls, io_loop=None):
+        """Initializes the ``SIGCHILD`` handler.
+
+        The signal handler is run on an IOLoop to avoid locking issues.
+        Note that the IOLoop used for signal handling need not be the
+        same one used by individual Subprocess objects (as long as the
+        IOLoops are each running in separate threads).
+        """
+        if cls._initialized:
+            return
+        if io_loop is None:
+            io_loop = ioloop.IOLoop.instance()
+        cls._old_sigchld = signal.signal(
+            signal.SIGCHLD,
+            lambda sig, frame: io_loop.add_callback(cls._cleanup))
+        cls._initialized = True
+
+    @classmethod
+    def uninitialize(cls):
+        """Removes the ``SIGCHILD`` handler."""
+        if not cls._initialized:
+            return
+        signal.signal(signal.SIGCHLD, cls._old_sigchld)
+        cls._initialized = False
+
+    @classmethod
+    def _cleanup(cls):
+        for pid in cls._waiting.keys():
+            cls._try_cleanup_process(pid)
+
+    @classmethod
+    def _try_cleanup_process(cls, pid):
+        try:
+            ret_pid, status = os.waitpid(pid, os.WNOHANG)
+        except OSError, e:
+            if e.args[0] == errno.ECHILD:
+                return
+        if ret_pid == 0:
+            return
+        assert ret_pid == pid
+        subproc = cls._waiting.pop(pid)
+        subproc.io_loop.add_callback(
+            functools.partial(subproc._set_returncode, status))
+
+    def _set_returncode(self, ret):
+        self.returncode = ret
+        if self._exit_callback:
+            callback = self._exit_callback
+            self._exit_callback = None
+            callback(ret)
index 18aeea332fbe278773a737de593331e5c68e64f5..950aa8d4540fbf2b1488ddd6c5e721329dc68bc8 100644 (file)
@@ -144,3 +144,26 @@ class SubprocessTest(AsyncTestCase):
         subproc.stdout.read_until_close(self.stop)
         data = self.wait()
         self.assertEqual(data, b(""))
+
+    def test_sigchild(self):
+        Subprocess.initialize(io_loop=self.io_loop)
+        self.addCleanup(Subprocess.uninitialize)
+        subproc = Subprocess([sys.executable, '-c', 'pass'],
+                             io_loop=self.io_loop)
+        subproc.set_exit_callback(self.stop)
+        ret = self.wait()
+        self.assertEqual(ret, 0)
+        self.assertEqual(subproc.returncode, ret)
+
+    def test_sigchild_signal(self):
+        Subprocess.initialize(io_loop=self.io_loop)
+        self.addCleanup(Subprocess.uninitialize)
+        subproc = Subprocess([sys.executable, '-c',
+                              'import time; time.sleep(30)'],
+                             io_loop=self.io_loop)
+        subproc.set_exit_callback(self.stop)
+        os.kill(subproc.pid, signal.SIGTERM)
+        ret = self.wait()
+        self.assertEqual(subproc.returncode, ret)
+        self.assertTrue(os.WIFSIGNALED(ret))
+        self.assertEqual(os.WTERMSIG(ret), signal.SIGTERM)