]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Fixes for IOLoop.run_in_executor 2178/head
authorBen Darnell <ben@bendarnell.com>
Sun, 22 Oct 2017 19:53:35 +0000 (15:53 -0400)
committerBen Darnell <ben@bendarnell.com>
Sun, 22 Oct 2017 20:29:58 +0000 (16:29 -0400)
Correctly transform the concurrent Future into a Tornado Future
and test that the result is usable with await.

tornado/ioloop.py
tornado/test/ioloop_test.py

index 5686576cf7dbe310c3e4d00c67f04ef3d5a0ec5d..a60632e48f188a5f47dcde65562dd2052be6ffdf 100644 (file)
@@ -44,7 +44,7 @@ import time
 import traceback
 import math
 
-from tornado.concurrent import TracebackFuture, is_future
+from tornado.concurrent import TracebackFuture, is_future, chain_future
 from tornado.log import app_log, gen_log
 from tornado.platform.auto import set_close_exec, Waker
 from tornado import stack_context
@@ -655,8 +655,12 @@ class IOLoop(Configurable):
                 from tornado.process import cpu_count
                 self._executor = ThreadPoolExecutor(max_workers=(cpu_count() * 5))
             executor = self._executor
-
-        return executor.submit(func, *args)
+        c_future = executor.submit(func, *args)
+        # Concurrent Futures are not usable with await. Wrap this in a
+        # Tornado Future instead, using self.add_future for thread-safety.
+        t_future = TracebackFuture()
+        self.add_future(c_future, lambda f: chain_future(f, t_future))
+        return t_future
 
     def set_default_executor(self, executor):
         """Sets the default executor to use with :meth:`run_in_executor`."""
index f3cd32ae42a5b4d1bc52c5cb760addb844bea0a8..603eb202d624ed6b40a92fb1bfbb2bfbdfe4ab54 100644 (file)
@@ -604,56 +604,70 @@ class TestIOLoopFutures(AsyncTestCase):
         event1 = threading.Event()
         event2 = threading.Event()
 
-        def callback(self_event, other_event):
+        def sync_func(self_event, other_event):
             self_event.set()
-            time.sleep(0.01)
-            self.assertTrue(other_event.is_set())
+            other_event.wait()
+            # Note that return value doesn't actually do anything,
+            # it is just passed through to our final assertion to
+            # make sure it is passed through properly.
             return self_event
 
+        # Run two synchronous functions, which would deadlock if not
+        # run in parallel.
         res = yield [
-            IOLoop.current().run_in_executor(None, callback, event1, event2),
-            IOLoop.current().run_in_executor(None, callback, event2, event1)
+            IOLoop.current().run_in_executor(None, sync_func, event1, event2),
+            IOLoop.current().run_in_executor(None, sync_func, event2, event1)
         ]
 
         self.assertEqual([event1, event2], res)
 
     @skipBefore35
+    @gen_test
     def test_run_in_executor_native(self):
         event1 = threading.Event()
         event2 = threading.Event()
 
-        def callback(self_event, other_event):
+        def sync_func(self_event, other_event):
             self_event.set()
-            time.sleep(0.01)
-            self.assertTrue(other_event.is_set())
             other_event.wait()
             return self_event
 
+        # Go through an async wrapper to ensure that the result of
+        # run_in_executor works with await and not just gen.coroutine
+        # (simply passing the underlying concurrrent future would do that).
         namespace = exec_test(globals(), locals(), """
-            async def main():
-                res = await gen.multi([
-                    IOLoop.current().run_in_executor(None, callback, event1, event2),
-                    IOLoop.current().run_in_executor(None, callback, event2, event1)
-                ])
-                self.assertEqual([event1, event2], res)
+            async def async_wrapper(self_event, other_event):
+                return await IOLoop.current().run_in_executor(
+                    None, sync_func, self_event, other_event)
         """)
-        IOLoop.current().run_sync(namespace['main'])
 
+        res = yield [
+            namespace["async_wrapper"](event1, event2),
+            namespace["async_wrapper"](event2, event1)
+            ]
+
+        self.assertEqual([event1, event2], res)
+
+    @gen_test
     def test_set_default_executor(self):
-        class MyExecutor(futures.Executor):
+        count = [0]
+
+        class MyExecutor(futures.ThreadPoolExecutor):
             def submit(self, func, *args):
-                return Future()
+                count[0] += 1
+                return super(MyExecutor, self).submit(func, *args)
 
         event = threading.Event()
 
-        def future_func():
+        def sync_func():
             event.set()
 
-        executor = MyExecutor()
+        executor = MyExecutor(1)
         loop = IOLoop.current()
         loop.set_default_executor(executor)
-        loop.run_in_executor(None, future_func)
-        loop.add_timeout(0.01, lambda: self.assertFalse(event.is_set()))
+        yield loop.run_in_executor(None, sync_func)
+        self.assertEqual(1, count[0])
+        self.assertTrue(event.is_set())
 
 
 class TestIOLoopRunSync(unittest.TestCase):