]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Add tornado.process.Subprocess
authorBen Darnell <ben@bendarnell.com>
Mon, 17 Sep 2012 02:26:03 +0000 (19:26 -0700)
committerBen Darnell <ben@bendarnell.com>
Mon, 17 Sep 2012 02:26:03 +0000 (19:26 -0700)
tornado/iostream.py
tornado/process.py
tornado/test/process_test.py

index 2f716a728cb2729046abed92c7d121c3f1be0665..0ec3bd6f8321bb12537e25a664287a9276fa1b9f 100644 (file)
@@ -379,7 +379,7 @@ class BaseIOStream(object):
         """
         try:
             chunk = self.read_from_fd()
-        except socket.error, e:
+        except (socket.error, IOError, OSError), e:
             # ssl.SSLError is a subclass of socket.error
             gen_log.warning("Read error on %d: %s",
                             self.fileno(), e)
@@ -824,6 +824,11 @@ class PipeIOStream(BaseIOStream):
         except (IOError, OSError), e:
             if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                 return None
+            elif e.args[0] == errno.EBADF:
+                # If the writing half of a pipe is closed, select will
+                # report it as readable but reads will fail with EBADF.
+                self.close()
+                return None
             else:
                 raise
         if not chunk:
index a2c63b83183201a9c948a139c2bc80429f2098b8..3e9711192a1d13f77a3a6c6908c605805c6515d5 100644 (file)
@@ -20,12 +20,14 @@ from __future__ import absolute_import, division, with_statement
 
 import errno
 import os
+import subprocess
 import sys
 import time
 
 from binascii import hexlify
 
 from tornado import ioloop
+from tornado.iostream import PipeIOStream
 from tornado.log import gen_log
 
 try:
@@ -156,3 +158,41 @@ def task_id():
     """
     global _task_id
     return _task_id
+
+class Subprocess(object):
+    """Wraps ``subprocess.Popen`` with IOStream support.
+
+    The constructor is the same as ``subprocess.Popen`` with the following
+    additions:
+
+    * ``stdin``, ``stdout``, and ``stderr`` may have the value
+      `tornado.process.Subprocess.STREAM`, which will make the corresponding
+      attribute of the resulting Subprocess a `PipeIOStream`.
+    * A new keyword argument ``io_loop`` may be used to pass in an IOLoop.
+    """
+    STREAM = object()
+
+    def __init__(self, *args, **kwargs):
+        io_loop = kwargs.pop('io_loop', None)
+        to_close = []
+        if kwargs.get('stdin') is Subprocess.STREAM:
+            in_r, in_w = os.pipe()
+            kwargs['stdin'] = in_r
+            to_close.append(in_r)
+            self.stdin = PipeIOStream(in_w, io_loop=io_loop)
+        if kwargs.get('stdout') is Subprocess.STREAM:
+            out_r, out_w = os.pipe()
+            kwargs['stdout'] = out_w
+            to_close.append(out_w)
+            self.stdout = PipeIOStream(out_r, io_loop=io_loop)
+        if kwargs.get('stderr') is Subprocess.STREAM:
+            err_r, err_w = os.pipe()
+            kwargs['stderr'] = err_w
+            to_close.append(err_w)
+            self.stdout = PipeIOStream(err_r, io_loop=io_loop)
+        self.proc = subprocess.Popen(*args, **kwargs)
+        for fd in to_close:
+            os.close(fd)
+        for attr in ['stdin', 'stdout', 'stderr', 'pid']:
+            if not hasattr(self, attr):  # don't clobber streams set above
+                setattr(self, attr, getattr(self.proc, attr))
index d076cac40b8593e4c3c6531fcf0bfac0574aaadf..18aeea332fbe278773a737de593331e5c68e64f5 100644 (file)
@@ -5,15 +5,17 @@ from __future__ import absolute_import, division, with_statement
 import logging
 import os
 import signal
+import subprocess
 import sys
 from tornado.httpclient import HTTPClient, HTTPError
 from tornado.httpserver import HTTPServer
 from tornado.ioloop import IOLoop
 from tornado.log import gen_log
-from tornado.process import fork_processes, task_id
+from tornado.process import fork_processes, task_id, Subprocess
 from tornado.simple_httpclient import SimpleAsyncHTTPClient
-from tornado.testing import bind_unused_port, ExpectLog
+from tornado.testing import bind_unused_port, ExpectLog, AsyncTestCase
 from tornado.test.util import unittest
+from tornado.util import b
 from tornado.web import RequestHandler, Application
 
 # Not using AsyncHTTPTestCase because we need control over the IOLoop.
@@ -120,3 +122,25 @@ class ProcessTest(unittest.TestCase):
                 raise
 ProcessTest = unittest.skipIf(os.name != 'posix' or sys.platform == 'cygwin',
                               "non-unix platform")(ProcessTest)
+
+
+class SubprocessTest(AsyncTestCase):
+    def test_subprocess(self):
+        subproc = Subprocess([sys.executable, '-u', '-i'],
+                             stdin=Subprocess.STREAM,
+                             stdout=Subprocess.STREAM, stderr=subprocess.STDOUT,
+                             io_loop=self.io_loop)
+        self.addCleanup(lambda: os.kill(subproc.pid, signal.SIGTERM))
+        subproc.stdout.read_until(b('>>> '), self.stop)
+        self.wait()
+        subproc.stdin.write(b("print('hello')\n"))
+        subproc.stdout.read_until(b('\n'), self.stop)
+        data = self.wait()
+        self.assertEqual(data, b("hello\n"))
+
+        subproc.stdout.read_until(b(">>> "), self.stop)
+        self.wait()
+        subproc.stdin.write(b("raise SystemExit\n"))
+        subproc.stdout.read_until_close(self.stop)
+        data = self.wait()
+        self.assertEqual(data, b(""))