From: Ben Darnell Date: Tue, 19 Dec 2017 03:56:40 +0000 (-0500) Subject: tcpserver: Fix multiprocess modes X-Git-Tag: v5.0.0~31^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bf1b21ab4289d45c1292c6d73cafe4b5a1320b2b;p=thirdparty%2Ftornado.git tcpserver: Fix multiprocess modes Two recent changes broke Tornado's multiprocess modes by initializing the event loop before the fork in typical patterns. - The removal of `io_loop` arguments moved a call to IOLoop.current() earlier in TCPServer's constructor. - The change to `asyncio.Future` initialized the asyncio event loop at import time. Move the call in TCPServer earlier (and audit the rest of that change to make sure this didn't happen anywhere else) and replace `gen.moment` and `gen._null_future` with dummy objects that do not involve the asyncio event loop. --- diff --git a/tornado/gen.py b/tornado/gen.py index ed76bc317..b215ac4c4 100644 --- a/tornado/gen.py +++ b/tornado/gen.py @@ -970,10 +970,25 @@ def sleep(duration): return f -_null_future = Future() -_null_future.set_result(None) +class _NullFuture(object): + """_NullFuture resembles a Future that finished with a result of None. -moment = Future() + It's not actually a `Future` to avoid depending on a particular event loop. + Handled as a special case in the coroutine runner. + """ + def result(self): + return None + + def done(self): + return True + + +# _null_future is used as a dummy value in the coroutine runner. It differs +# from moment in that moment always adds a delay of one IOLoop iteration +# while _null_future is processed as soon as possible. +_null_future = _NullFuture() + +moment = _NullFuture() moment.__doc__ = \ """A special object which may be yielded to allow the IOLoop to run for one iteration. @@ -989,7 +1004,6 @@ Usage: ``yield gen.moment`` ``yield None`` (or ``yield`` with no argument) is now equivalent to ``yield gen.moment``. """ -moment.set_result(None) class Runner(object): @@ -1287,8 +1301,10 @@ def convert_yielded(yielded): .. versionadded:: 4.1 """ # Lists and dicts containing YieldPoints were handled earlier. - if yielded is None: + if yielded is None or yielded is moment: return moment + elif yielded is _null_future: + return _null_future elif isinstance(yielded, (list, dict)): return multi(yielded) elif is_future(yielded): diff --git a/tornado/queues.py b/tornado/queues.py index cba0aa7f2..e792e0c04 100644 --- a/tornado/queues.py +++ b/tornado/queues.py @@ -175,15 +175,15 @@ class Queue(object): `datetime.timedelta` object for a deadline relative to the current time. """ + future = Future() try: self.put_nowait(item) except QueueFull: - future = Future() self._putters.append((item, future)) _set_timeout(future, timeout) - return future else: - return gen._null_future + future.set_result(None) + return future def put_nowait(self, item): """Put an item into the queue without blocking. diff --git a/tornado/tcpserver.py b/tornado/tcpserver.py index fea215f53..53cd26af4 100644 --- a/tornado/tcpserver.py +++ b/tornado/tcpserver.py @@ -108,7 +108,6 @@ class TCPServer(object): """ def __init__(self, ssl_options=None, max_buffer_size=None, read_chunk_size=None): - self.io_loop = IOLoop.current() self.ssl_options = ssl_options self._sockets = {} # fd -> socket object self._handlers = {} # fd -> remove_handler callable @@ -296,7 +295,7 @@ class TCPServer(object): future = self.handle_stream(stream, address) if future is not None: - self.io_loop.add_future(gen.convert_yielded(future), - lambda f: f.result()) + IOLoop.current().add_future(gen.convert_yielded(future), + lambda f: f.result()) except Exception: app_log.error("Error in connection callback", exc_info=True) diff --git a/tornado/test/tcpserver_test.py b/tornado/test/tcpserver_test.py index 0fe7a81ac..4c238a321 100644 --- a/tornado/test/tcpserver_test.py +++ b/tornado/test/tcpserver_test.py @@ -1,13 +1,17 @@ from __future__ import absolute_import, division, print_function import socket +import subprocess +import sys +import textwrap +from tornado.escape import utf8, to_unicode from tornado import gen from tornado.iostream import IOStream from tornado.log import app_log from tornado.stack_context import NullContext from tornado.tcpserver import TCPServer -from tornado.test.util import skipBefore35, exec_test +from tornado.test.util import skipBefore35, skipIfNonUnix, exec_test, unittest from tornado.testing import AsyncTestCase, ExpectLog, bind_unused_port, gen_test @@ -114,3 +118,76 @@ class TCPServerTest(AsyncTestCase): c.close() # Here tearDown() would re-raise the EBADF encountered in the IO loop + + +@skipIfNonUnix +class TestMultiprocess(unittest.TestCase): + # These tests verify that the two multiprocess examples from the + # TCPServer docs work. Both tests start a server with three worker + # processes, each of which prints its task id to stdout (a single + # byte, so we don't have to worry about atomicity of the shared + # stdout stream) and then exits. + def run_subproc(self, code): + proc = subprocess.Popen(sys.executable, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE) + proc.stdin.write(utf8(code)) + proc.stdin.close() + proc.wait() + stdout = proc.stdout.read() + proc.stdout.close() + if proc.returncode != 0: + raise RuntimeError("Process returned %d. stdout=%r" % ( + proc.returncode, stdout)) + return to_unicode(stdout) + + def test_single(self): + # As a sanity check, run the single-process version through this test + # harness too. + code = textwrap.dedent(""" + from __future__ import print_function + from tornado.ioloop import IOLoop + from tornado.tcpserver import TCPServer + + server = TCPServer() + server.listen(0, address='127.0.0.1') + IOLoop.current().run_sync(lambda: None) + print('012', end='') + """) + out = self.run_subproc(code) + self.assertEqual(''.join(sorted(out)), "012") + + def test_simple(self): + code = textwrap.dedent(""" + from __future__ import print_function + from tornado.ioloop import IOLoop + from tornado.process import task_id + from tornado.tcpserver import TCPServer + + server = TCPServer() + server.bind(0, address='127.0.0.1') + server.start(3) + IOLoop.current().run_sync(lambda: None) + print(task_id(), end='') + """) + out = self.run_subproc(code) + self.assertEqual(''.join(sorted(out)), "012") + + def test_advanced(self): + code = textwrap.dedent(""" + from __future__ import print_function + from tornado.ioloop import IOLoop + from tornado.netutil import bind_sockets + from tornado.process import fork_processes, task_id + from tornado.ioloop import IOLoop + from tornado.tcpserver import TCPServer + + sockets = bind_sockets(0, address='127.0.0.1') + fork_processes(3) + server = TCPServer() + server.add_sockets(sockets) + IOLoop.current().run_sync(lambda: None) + print(task_id(), end='') + """) + out = self.run_subproc(code) + self.assertEqual(''.join(sorted(out)), "012")