From: Ben Darnell Date: Mon, 17 Sep 2012 06:24:45 +0000 (-0700) Subject: Add a SIGCHILD handler to Subprocess. X-Git-Tag: v3.0.0~272^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2a2b40507b1ecdbcae320f3042624a7e8a837a39;p=thirdparty%2Ftornado.git Add a SIGCHILD handler to Subprocess. --- diff --git a/tornado/process.py b/tornado/process.py index 3e9711192..2e8ea329d 100644 --- a/tornado/process.py +++ b/tornado/process.py @@ -19,7 +19,9 @@ from __future__ import absolute_import, division, with_statement import errno +import functools import os +import signal import subprocess import sys import time @@ -29,6 +31,7 @@ from binascii import hexlify from tornado import ioloop from tornado.iostream import PipeIOStream from tornado.log import gen_log +from tornado import stack_context try: import multiprocessing # Python 2.6+ @@ -172,27 +175,104 @@ class Subprocess(object): """ STREAM = object() + _initialized = False + _waiting = {} + def __init__(self, *args, **kwargs): - io_loop = kwargs.pop('io_loop', None) + self.io_loop = kwargs.pop('io_loop', None) to_close = [] if kwargs.get('stdin') is Subprocess.STREAM: in_r, in_w = os.pipe() kwargs['stdin'] = in_r to_close.append(in_r) - self.stdin = PipeIOStream(in_w, io_loop=io_loop) + self.stdin = PipeIOStream(in_w, io_loop=self.io_loop) if kwargs.get('stdout') is Subprocess.STREAM: out_r, out_w = os.pipe() kwargs['stdout'] = out_w to_close.append(out_w) - self.stdout = PipeIOStream(out_r, io_loop=io_loop) + self.stdout = PipeIOStream(out_r, io_loop=self.io_loop) if kwargs.get('stderr') is Subprocess.STREAM: err_r, err_w = os.pipe() kwargs['stderr'] = err_w to_close.append(err_w) - self.stdout = PipeIOStream(err_r, io_loop=io_loop) + self.stdout = PipeIOStream(err_r, io_loop=self.io_loop) self.proc = subprocess.Popen(*args, **kwargs) for fd in to_close: os.close(fd) for attr in ['stdin', 'stdout', 'stderr', 'pid']: if not hasattr(self, attr): # don't clobber streams set above setattr(self, attr, getattr(self.proc, attr)) + self._exit_callback = None + self.returncode = None + + def set_exit_callback(self, callback): + """Runs ``callback`` when this process exits. + + The callback takes one argument, the return code of the process. + + This method uses a ``SIGCHILD`` handler, which is a global setting + and may conflict if you have other libraries trying to handle the + same signal. If you are using more than one ``IOLoop`` it may + be necessary to call `Subprocess.initialize` first to designate + one ``IOLoop`` to run the signal handlers. + + In many cases a close callback on the stdout or stderr streams + can be used as an alternative to an exit callback if the + signal handler is causing a problem. + """ + self._exit_callback = stack_context.wrap(callback) + Subprocess.initialize(self.io_loop) + Subprocess._waiting[self.pid] = self + Subprocess._try_cleanup_process(self.pid) + + @classmethod + def initialize(cls, io_loop=None): + """Initializes the ``SIGCHILD`` handler. + + The signal handler is run on an IOLoop to avoid locking issues. + Note that the IOLoop used for signal handling need not be the + same one used by individual Subprocess objects (as long as the + IOLoops are each running in separate threads). + """ + if cls._initialized: + return + if io_loop is None: + io_loop = ioloop.IOLoop.instance() + cls._old_sigchld = signal.signal( + signal.SIGCHLD, + lambda sig, frame: io_loop.add_callback(cls._cleanup)) + cls._initialized = True + + @classmethod + def uninitialize(cls): + """Removes the ``SIGCHILD`` handler.""" + if not cls._initialized: + return + signal.signal(signal.SIGCHLD, cls._old_sigchld) + cls._initialized = False + + @classmethod + def _cleanup(cls): + for pid in cls._waiting.keys(): + cls._try_cleanup_process(pid) + + @classmethod + def _try_cleanup_process(cls, pid): + try: + ret_pid, status = os.waitpid(pid, os.WNOHANG) + except OSError, e: + if e.args[0] == errno.ECHILD: + return + if ret_pid == 0: + return + assert ret_pid == pid + subproc = cls._waiting.pop(pid) + subproc.io_loop.add_callback( + functools.partial(subproc._set_returncode, status)) + + def _set_returncode(self, ret): + self.returncode = ret + if self._exit_callback: + callback = self._exit_callback + self._exit_callback = None + callback(ret) diff --git a/tornado/test/process_test.py b/tornado/test/process_test.py index 18aeea332..950aa8d45 100644 --- a/tornado/test/process_test.py +++ b/tornado/test/process_test.py @@ -144,3 +144,26 @@ class SubprocessTest(AsyncTestCase): subproc.stdout.read_until_close(self.stop) data = self.wait() self.assertEqual(data, b("")) + + def test_sigchild(self): + Subprocess.initialize(io_loop=self.io_loop) + self.addCleanup(Subprocess.uninitialize) + subproc = Subprocess([sys.executable, '-c', 'pass'], + io_loop=self.io_loop) + subproc.set_exit_callback(self.stop) + ret = self.wait() + self.assertEqual(ret, 0) + self.assertEqual(subproc.returncode, ret) + + def test_sigchild_signal(self): + Subprocess.initialize(io_loop=self.io_loop) + self.addCleanup(Subprocess.uninitialize) + subproc = Subprocess([sys.executable, '-c', + 'import time; time.sleep(30)'], + io_loop=self.io_loop) + subproc.set_exit_callback(self.stop) + os.kill(subproc.pid, signal.SIGTERM) + ret = self.wait() + self.assertEqual(subproc.returncode, ret) + self.assertTrue(os.WIFSIGNALED(ret)) + self.assertEqual(os.WTERMSIG(ret), signal.SIGTERM)