From a3b44cd701e0e82693363701bc0346b0125d2362 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 22 Oct 2017 15:53:35 -0400 Subject: [PATCH] Fixes for IOLoop.run_in_executor Correctly transform the concurrent Future into a Tornado Future and test that the result is usable with await. --- tornado/ioloop.py | 10 +++++-- tornado/test/ioloop_test.py | 56 +++++++++++++++++++++++-------------- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/tornado/ioloop.py b/tornado/ioloop.py index 5686576cf..a60632e48 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -44,7 +44,7 @@ import time import traceback import math -from tornado.concurrent import TracebackFuture, is_future +from tornado.concurrent import TracebackFuture, is_future, chain_future from tornado.log import app_log, gen_log from tornado.platform.auto import set_close_exec, Waker from tornado import stack_context @@ -655,8 +655,12 @@ class IOLoop(Configurable): from tornado.process import cpu_count self._executor = ThreadPoolExecutor(max_workers=(cpu_count() * 5)) executor = self._executor - - return executor.submit(func, *args) + c_future = executor.submit(func, *args) + # Concurrent Futures are not usable with await. Wrap this in a + # Tornado Future instead, using self.add_future for thread-safety. + t_future = TracebackFuture() + self.add_future(c_future, lambda f: chain_future(f, t_future)) + return t_future def set_default_executor(self, executor): """Sets the default executor to use with :meth:`run_in_executor`.""" diff --git a/tornado/test/ioloop_test.py b/tornado/test/ioloop_test.py index f3cd32ae4..603eb202d 100644 --- a/tornado/test/ioloop_test.py +++ b/tornado/test/ioloop_test.py @@ -604,56 +604,70 @@ class TestIOLoopFutures(AsyncTestCase): event1 = threading.Event() event2 = threading.Event() - def callback(self_event, other_event): + def sync_func(self_event, other_event): self_event.set() - time.sleep(0.01) - self.assertTrue(other_event.is_set()) + other_event.wait() + # Note that return value doesn't actually do anything, + # it is just passed through to our final assertion to + # make sure it is passed through properly. return self_event + # Run two synchronous functions, which would deadlock if not + # run in parallel. res = yield [ - IOLoop.current().run_in_executor(None, callback, event1, event2), - IOLoop.current().run_in_executor(None, callback, event2, event1) + IOLoop.current().run_in_executor(None, sync_func, event1, event2), + IOLoop.current().run_in_executor(None, sync_func, event2, event1) ] self.assertEqual([event1, event2], res) @skipBefore35 + @gen_test def test_run_in_executor_native(self): event1 = threading.Event() event2 = threading.Event() - def callback(self_event, other_event): + def sync_func(self_event, other_event): self_event.set() - time.sleep(0.01) - self.assertTrue(other_event.is_set()) other_event.wait() return self_event + # Go through an async wrapper to ensure that the result of + # run_in_executor works with await and not just gen.coroutine + # (simply passing the underlying concurrrent future would do that). namespace = exec_test(globals(), locals(), """ - async def main(): - res = await gen.multi([ - IOLoop.current().run_in_executor(None, callback, event1, event2), - IOLoop.current().run_in_executor(None, callback, event2, event1) - ]) - self.assertEqual([event1, event2], res) + async def async_wrapper(self_event, other_event): + return await IOLoop.current().run_in_executor( + None, sync_func, self_event, other_event) """) - IOLoop.current().run_sync(namespace['main']) + res = yield [ + namespace["async_wrapper"](event1, event2), + namespace["async_wrapper"](event2, event1) + ] + + self.assertEqual([event1, event2], res) + + @gen_test def test_set_default_executor(self): - class MyExecutor(futures.Executor): + count = [0] + + class MyExecutor(futures.ThreadPoolExecutor): def submit(self, func, *args): - return Future() + count[0] += 1 + return super(MyExecutor, self).submit(func, *args) event = threading.Event() - def future_func(): + def sync_func(): event.set() - executor = MyExecutor() + executor = MyExecutor(1) loop = IOLoop.current() loop.set_default_executor(executor) - loop.run_in_executor(None, future_func) - loop.add_timeout(0.01, lambda: self.assertFalse(event.is_set())) + yield loop.run_in_executor(None, sync_func) + self.assertEqual(1, count[0]) + self.assertTrue(event.is_set()) class TestIOLoopRunSync(unittest.TestCase): -- 2.47.2