"""
try:
chunk = self.read_from_fd()
- except socket.error, e:
+ except (socket.error, IOError, OSError), e:
# ssl.SSLError is a subclass of socket.error
gen_log.warning("Read error on %d: %s",
self.fileno(), e)
except (IOError, OSError), e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
return None
+ elif e.args[0] == errno.EBADF:
+ # If the writing half of a pipe is closed, select will
+ # report it as readable but reads will fail with EBADF.
+ self.close()
+ return None
else:
raise
if not chunk:
import errno
import os
+import subprocess
import sys
import time
from binascii import hexlify
from tornado import ioloop
+from tornado.iostream import PipeIOStream
from tornado.log import gen_log
try:
"""
global _task_id
return _task_id
+
+class Subprocess(object):
+ """Wraps ``subprocess.Popen`` with IOStream support.
+
+ The constructor is the same as ``subprocess.Popen`` with the following
+ additions:
+
+ * ``stdin``, ``stdout``, and ``stderr`` may have the value
+ `tornado.process.Subprocess.STREAM`, which will make the corresponding
+ attribute of the resulting Subprocess a `PipeIOStream`.
+ * A new keyword argument ``io_loop`` may be used to pass in an IOLoop.
+ """
+ STREAM = object()
+
+ def __init__(self, *args, **kwargs):
+ io_loop = kwargs.pop('io_loop', None)
+ to_close = []
+ if kwargs.get('stdin') is Subprocess.STREAM:
+ in_r, in_w = os.pipe()
+ kwargs['stdin'] = in_r
+ to_close.append(in_r)
+ self.stdin = PipeIOStream(in_w, io_loop=io_loop)
+ if kwargs.get('stdout') is Subprocess.STREAM:
+ out_r, out_w = os.pipe()
+ kwargs['stdout'] = out_w
+ to_close.append(out_w)
+ self.stdout = PipeIOStream(out_r, io_loop=io_loop)
+ if kwargs.get('stderr') is Subprocess.STREAM:
+ err_r, err_w = os.pipe()
+ kwargs['stderr'] = err_w
+ to_close.append(err_w)
+ self.stdout = PipeIOStream(err_r, io_loop=io_loop)
+ self.proc = subprocess.Popen(*args, **kwargs)
+ for fd in to_close:
+ os.close(fd)
+ for attr in ['stdin', 'stdout', 'stderr', 'pid']:
+ if not hasattr(self, attr): # don't clobber streams set above
+ setattr(self, attr, getattr(self.proc, attr))
import logging
import os
import signal
+import subprocess
import sys
from tornado.httpclient import HTTPClient, HTTPError
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.log import gen_log
-from tornado.process import fork_processes, task_id
+from tornado.process import fork_processes, task_id, Subprocess
from tornado.simple_httpclient import SimpleAsyncHTTPClient
-from tornado.testing import bind_unused_port, ExpectLog
+from tornado.testing import bind_unused_port, ExpectLog, AsyncTestCase
from tornado.test.util import unittest
+from tornado.util import b
from tornado.web import RequestHandler, Application
# Not using AsyncHTTPTestCase because we need control over the IOLoop.
raise
ProcessTest = unittest.skipIf(os.name != 'posix' or sys.platform == 'cygwin',
"non-unix platform")(ProcessTest)
+
+
+class SubprocessTest(AsyncTestCase):
+ def test_subprocess(self):
+ subproc = Subprocess([sys.executable, '-u', '-i'],
+ stdin=Subprocess.STREAM,
+ stdout=Subprocess.STREAM, stderr=subprocess.STDOUT,
+ io_loop=self.io_loop)
+ self.addCleanup(lambda: os.kill(subproc.pid, signal.SIGTERM))
+ subproc.stdout.read_until(b('>>> '), self.stop)
+ self.wait()
+ subproc.stdin.write(b("print('hello')\n"))
+ subproc.stdout.read_until(b('\n'), self.stop)
+ data = self.wait()
+ self.assertEqual(data, b("hello\n"))
+
+ subproc.stdout.read_until(b(">>> "), self.stop)
+ self.wait()
+ subproc.stdin.write(b("raise SystemExit\n"))
+ subproc.stdout.read_until_close(self.stop)
+ data = self.wait()
+ self.assertEqual(data, b(""))