]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
tcpserver: Fix multiprocess modes 2220/head
authorBen Darnell <ben@bendarnell.com>
Tue, 19 Dec 2017 03:56:40 +0000 (22:56 -0500)
committerBen Darnell <ben@bendarnell.com>
Tue, 19 Dec 2017 14:10:09 +0000 (09:10 -0500)
Two recent changes broke Tornado's multiprocess modes by initializing
the event loop before the fork in typical patterns.
- The removal of `io_loop` arguments moved a call to
  IOLoop.current() earlier in TCPServer's constructor.
- The change to `asyncio.Future` initialized the asyncio event loop at
  import time.

Move the call in TCPServer earlier (and audit the rest of that change
to make sure this didn't happen anywhere else) and replace
`gen.moment` and `gen._null_future` with dummy objects that do not
involve the asyncio event loop.

tornado/gen.py
tornado/queues.py
tornado/tcpserver.py
tornado/test/tcpserver_test.py

index ed76bc3179fb2cc1fe54549d618614b13f9e4377..b215ac4c443702339382be7de307e6cdac68ced1 100644 (file)
@@ -970,10 +970,25 @@ def sleep(duration):
     return f
 
 
-_null_future = Future()
-_null_future.set_result(None)
+class _NullFuture(object):
+    """_NullFuture resembles a Future that finished with a result of None.
 
-moment = Future()
+    It's not actually a `Future` to avoid depending on a particular event loop.
+    Handled as a special case in the coroutine runner.
+    """
+    def result(self):
+        return None
+
+    def done(self):
+        return True
+
+
+# _null_future is used as a dummy value in the coroutine runner. It differs
+# from moment in that moment always adds a delay of one IOLoop iteration
+# while _null_future is processed as soon as possible.
+_null_future = _NullFuture()
+
+moment = _NullFuture()
 moment.__doc__ = \
     """A special object which may be yielded to allow the IOLoop to run for
 one iteration.
@@ -989,7 +1004,6 @@ Usage: ``yield gen.moment``
    ``yield None`` (or ``yield`` with no argument) is now equivalent to
     ``yield gen.moment``.
 """
-moment.set_result(None)
 
 
 class Runner(object):
@@ -1287,8 +1301,10 @@ def convert_yielded(yielded):
     .. versionadded:: 4.1
     """
     # Lists and dicts containing YieldPoints were handled earlier.
-    if yielded is None:
+    if yielded is None or yielded is moment:
         return moment
+    elif yielded is _null_future:
+        return _null_future
     elif isinstance(yielded, (list, dict)):
         return multi(yielded)
     elif is_future(yielded):
index cba0aa7f29aafd706aa327fc194ac370d5b5fbc9..e792e0c04db609660d2b3ba919f38d8ac9c86ab5 100644 (file)
@@ -175,15 +175,15 @@ class Queue(object):
         `datetime.timedelta` object for a deadline relative to the
         current time.
         """
+        future = Future()
         try:
             self.put_nowait(item)
         except QueueFull:
-            future = Future()
             self._putters.append((item, future))
             _set_timeout(future, timeout)
-            return future
         else:
-            return gen._null_future
+            future.set_result(None)
+        return future
 
     def put_nowait(self, item):
         """Put an item into the queue without blocking.
index fea215f53ac85289110aa227a6a8433756990346..53cd26af409c45c7c904c48d85d1692cdb36603b 100644 (file)
@@ -108,7 +108,6 @@ class TCPServer(object):
     """
     def __init__(self, ssl_options=None, max_buffer_size=None,
                  read_chunk_size=None):
-        self.io_loop = IOLoop.current()
         self.ssl_options = ssl_options
         self._sockets = {}   # fd -> socket object
         self._handlers = {}  # fd -> remove_handler callable
@@ -296,7 +295,7 @@ class TCPServer(object):
 
             future = self.handle_stream(stream, address)
             if future is not None:
-                self.io_loop.add_future(gen.convert_yielded(future),
-                                        lambda f: f.result())
+                IOLoop.current().add_future(gen.convert_yielded(future),
+                                            lambda f: f.result())
         except Exception:
             app_log.error("Error in connection callback", exc_info=True)
index 0fe7a81acf1cbef3b65c313786162fe06124c157..4c238a321c7af6b03329e4004df6dfc41d2c038c 100644 (file)
@@ -1,13 +1,17 @@
 from __future__ import absolute_import, division, print_function
 
 import socket
+import subprocess
+import sys
+import textwrap
 
+from tornado.escape import utf8, to_unicode
 from tornado import gen
 from tornado.iostream import IOStream
 from tornado.log import app_log
 from tornado.stack_context import NullContext
 from tornado.tcpserver import TCPServer
-from tornado.test.util import skipBefore35, exec_test
+from tornado.test.util import skipBefore35, skipIfNonUnix, exec_test, unittest
 from tornado.testing import AsyncTestCase, ExpectLog, bind_unused_port, gen_test
 
 
@@ -114,3 +118,76 @@ class TCPServerTest(AsyncTestCase):
                 c.close()
 
         # Here tearDown() would re-raise the EBADF encountered in the IO loop
+
+
+@skipIfNonUnix
+class TestMultiprocess(unittest.TestCase):
+    # These tests verify that the two multiprocess examples from the
+    # TCPServer docs work. Both tests start a server with three worker
+    # processes, each of which prints its task id to stdout (a single
+    # byte, so we don't have to worry about atomicity of the shared
+    # stdout stream) and then exits.
+    def run_subproc(self, code):
+        proc = subprocess.Popen(sys.executable,
+                                stdin=subprocess.PIPE,
+                                stdout=subprocess.PIPE)
+        proc.stdin.write(utf8(code))
+        proc.stdin.close()
+        proc.wait()
+        stdout = proc.stdout.read()
+        proc.stdout.close()
+        if proc.returncode != 0:
+            raise RuntimeError("Process returned %d. stdout=%r" % (
+                proc.returncode, stdout))
+        return to_unicode(stdout)
+
+    def test_single(self):
+        # As a sanity check, run the single-process version through this test
+        # harness too.
+        code = textwrap.dedent("""
+            from __future__ import print_function
+            from tornado.ioloop import IOLoop
+            from tornado.tcpserver import TCPServer
+
+            server = TCPServer()
+            server.listen(0, address='127.0.0.1')
+            IOLoop.current().run_sync(lambda: None)
+            print('012', end='')
+        """)
+        out = self.run_subproc(code)
+        self.assertEqual(''.join(sorted(out)), "012")
+
+    def test_simple(self):
+        code = textwrap.dedent("""
+            from __future__ import print_function
+            from tornado.ioloop import IOLoop
+            from tornado.process import task_id
+            from tornado.tcpserver import TCPServer
+
+            server = TCPServer()
+            server.bind(0, address='127.0.0.1')
+            server.start(3)
+            IOLoop.current().run_sync(lambda: None)
+            print(task_id(), end='')
+        """)
+        out = self.run_subproc(code)
+        self.assertEqual(''.join(sorted(out)), "012")
+
+    def test_advanced(self):
+        code = textwrap.dedent("""
+            from __future__ import print_function
+            from tornado.ioloop import IOLoop
+            from tornado.netutil import bind_sockets
+            from tornado.process import fork_processes, task_id
+            from tornado.ioloop import IOLoop
+            from tornado.tcpserver import TCPServer
+
+            sockets = bind_sockets(0, address='127.0.0.1')
+            fork_processes(3)
+            server = TCPServer()
+            server.add_sockets(sockets)
+            IOLoop.current().run_sync(lambda: None)
+            print(task_id(), end='')
+        """)
+        out = self.run_subproc(code)
+        self.assertEqual(''.join(sorted(out)), "012")