from __future__ import absolute_import, division, with_statement
import errno
+import functools
import os
+import signal
import subprocess
import sys
import time
from tornado import ioloop
from tornado.iostream import PipeIOStream
from tornado.log import gen_log
+from tornado import stack_context
try:
import multiprocessing # Python 2.6+
"""
STREAM = object()
+ _initialized = False
+ _waiting = {}
+
def __init__(self, *args, **kwargs):
- io_loop = kwargs.pop('io_loop', None)
+ self.io_loop = kwargs.pop('io_loop', None)
to_close = []
if kwargs.get('stdin') is Subprocess.STREAM:
in_r, in_w = os.pipe()
kwargs['stdin'] = in_r
to_close.append(in_r)
- self.stdin = PipeIOStream(in_w, io_loop=io_loop)
+ self.stdin = PipeIOStream(in_w, io_loop=self.io_loop)
if kwargs.get('stdout') is Subprocess.STREAM:
out_r, out_w = os.pipe()
kwargs['stdout'] = out_w
to_close.append(out_w)
- self.stdout = PipeIOStream(out_r, io_loop=io_loop)
+ self.stdout = PipeIOStream(out_r, io_loop=self.io_loop)
if kwargs.get('stderr') is Subprocess.STREAM:
err_r, err_w = os.pipe()
kwargs['stderr'] = err_w
to_close.append(err_w)
- self.stdout = PipeIOStream(err_r, io_loop=io_loop)
+ self.stdout = PipeIOStream(err_r, io_loop=self.io_loop)
self.proc = subprocess.Popen(*args, **kwargs)
for fd in to_close:
os.close(fd)
for attr in ['stdin', 'stdout', 'stderr', 'pid']:
if not hasattr(self, attr): # don't clobber streams set above
setattr(self, attr, getattr(self.proc, attr))
+ self._exit_callback = None
+ self.returncode = None
+
+ def set_exit_callback(self, callback):
+ """Runs ``callback`` when this process exits.
+
+ The callback takes one argument, the return code of the process.
+
+ This method uses a ``SIGCHILD`` handler, which is a global setting
+ and may conflict if you have other libraries trying to handle the
+ same signal. If you are using more than one ``IOLoop`` it may
+ be necessary to call `Subprocess.initialize` first to designate
+ one ``IOLoop`` to run the signal handlers.
+
+ In many cases a close callback on the stdout or stderr streams
+ can be used as an alternative to an exit callback if the
+ signal handler is causing a problem.
+ """
+ self._exit_callback = stack_context.wrap(callback)
+ Subprocess.initialize(self.io_loop)
+ Subprocess._waiting[self.pid] = self
+ Subprocess._try_cleanup_process(self.pid)
+
+ @classmethod
+ def initialize(cls, io_loop=None):
+ """Initializes the ``SIGCHILD`` handler.
+
+ The signal handler is run on an IOLoop to avoid locking issues.
+ Note that the IOLoop used for signal handling need not be the
+ same one used by individual Subprocess objects (as long as the
+ IOLoops are each running in separate threads).
+ """
+ if cls._initialized:
+ return
+ if io_loop is None:
+ io_loop = ioloop.IOLoop.instance()
+ cls._old_sigchld = signal.signal(
+ signal.SIGCHLD,
+ lambda sig, frame: io_loop.add_callback(cls._cleanup))
+ cls._initialized = True
+
+ @classmethod
+ def uninitialize(cls):
+ """Removes the ``SIGCHILD`` handler."""
+ if not cls._initialized:
+ return
+ signal.signal(signal.SIGCHLD, cls._old_sigchld)
+ cls._initialized = False
+
+ @classmethod
+ def _cleanup(cls):
+ for pid in cls._waiting.keys():
+ cls._try_cleanup_process(pid)
+
+ @classmethod
+ def _try_cleanup_process(cls, pid):
+ try:
+ ret_pid, status = os.waitpid(pid, os.WNOHANG)
+ except OSError, e:
+ if e.args[0] == errno.ECHILD:
+ return
+ if ret_pid == 0:
+ return
+ assert ret_pid == pid
+ subproc = cls._waiting.pop(pid)
+ subproc.io_loop.add_callback(
+ functools.partial(subproc._set_returncode, status))
+
+ def _set_returncode(self, ret):
+ self.returncode = ret
+ if self._exit_callback:
+ callback = self._exit_callback
+ self._exit_callback = None
+ callback(ret)
subproc.stdout.read_until_close(self.stop)
data = self.wait()
self.assertEqual(data, b(""))
+
+ def test_sigchild(self):
+ Subprocess.initialize(io_loop=self.io_loop)
+ self.addCleanup(Subprocess.uninitialize)
+ subproc = Subprocess([sys.executable, '-c', 'pass'],
+ io_loop=self.io_loop)
+ subproc.set_exit_callback(self.stop)
+ ret = self.wait()
+ self.assertEqual(ret, 0)
+ self.assertEqual(subproc.returncode, ret)
+
+ def test_sigchild_signal(self):
+ Subprocess.initialize(io_loop=self.io_loop)
+ self.addCleanup(Subprocess.uninitialize)
+ subproc = Subprocess([sys.executable, '-c',
+ 'import time; time.sleep(30)'],
+ io_loop=self.io_loop)
+ subproc.set_exit_callback(self.stop)
+ os.kill(subproc.pid, signal.SIGTERM)
+ ret = self.wait()
+ self.assertEqual(subproc.returncode, ret)
+ self.assertTrue(os.WIFSIGNALED(ret))
+ self.assertEqual(os.WTERMSIG(ret), signal.SIGTERM)