import traceback
import math
-from tornado.concurrent import TracebackFuture, is_future
+from tornado.concurrent import TracebackFuture, is_future, chain_future
from tornado.log import app_log, gen_log
from tornado.platform.auto import set_close_exec, Waker
from tornado import stack_context
from tornado.process import cpu_count
self._executor = ThreadPoolExecutor(max_workers=(cpu_count() * 5))
executor = self._executor
-
- return executor.submit(func, *args)
+ c_future = executor.submit(func, *args)
+ # Concurrent Futures are not usable with await. Wrap this in a
+ # Tornado Future instead, using self.add_future for thread-safety.
+ t_future = TracebackFuture()
+ self.add_future(c_future, lambda f: chain_future(f, t_future))
+ return t_future
def set_default_executor(self, executor):
"""Sets the default executor to use with :meth:`run_in_executor`."""
event1 = threading.Event()
event2 = threading.Event()
- def callback(self_event, other_event):
+ def sync_func(self_event, other_event):
self_event.set()
- time.sleep(0.01)
- self.assertTrue(other_event.is_set())
+ other_event.wait()
+ # Note that return value doesn't actually do anything,
+ # it is just passed through to our final assertion to
+ # make sure it is passed through properly.
return self_event
+ # Run two synchronous functions, which would deadlock if not
+ # run in parallel.
res = yield [
- IOLoop.current().run_in_executor(None, callback, event1, event2),
- IOLoop.current().run_in_executor(None, callback, event2, event1)
+ IOLoop.current().run_in_executor(None, sync_func, event1, event2),
+ IOLoop.current().run_in_executor(None, sync_func, event2, event1)
]
self.assertEqual([event1, event2], res)
@skipBefore35
+ @gen_test
def test_run_in_executor_native(self):
event1 = threading.Event()
event2 = threading.Event()
- def callback(self_event, other_event):
+ def sync_func(self_event, other_event):
self_event.set()
- time.sleep(0.01)
- self.assertTrue(other_event.is_set())
other_event.wait()
return self_event
+ # Go through an async wrapper to ensure that the result of
+ # run_in_executor works with await and not just gen.coroutine
+ # (simply passing the underlying concurrrent future would do that).
namespace = exec_test(globals(), locals(), """
- async def main():
- res = await gen.multi([
- IOLoop.current().run_in_executor(None, callback, event1, event2),
- IOLoop.current().run_in_executor(None, callback, event2, event1)
- ])
- self.assertEqual([event1, event2], res)
+ async def async_wrapper(self_event, other_event):
+ return await IOLoop.current().run_in_executor(
+ None, sync_func, self_event, other_event)
""")
- IOLoop.current().run_sync(namespace['main'])
+ res = yield [
+ namespace["async_wrapper"](event1, event2),
+ namespace["async_wrapper"](event2, event1)
+ ]
+
+ self.assertEqual([event1, event2], res)
+
+ @gen_test
def test_set_default_executor(self):
- class MyExecutor(futures.Executor):
+ count = [0]
+
+ class MyExecutor(futures.ThreadPoolExecutor):
def submit(self, func, *args):
- return Future()
+ count[0] += 1
+ return super(MyExecutor, self).submit(func, *args)
event = threading.Event()
- def future_func():
+ def sync_func():
event.set()
- executor = MyExecutor()
+ executor = MyExecutor(1)
loop = IOLoop.current()
loop.set_default_executor(executor)
- loop.run_in_executor(None, future_func)
- loop.add_timeout(0.01, lambda: self.assertFalse(event.is_set()))
+ yield loop.run_in_executor(None, sync_func)
+ self.assertEqual(1, count[0])
+ self.assertTrue(event.is_set())
class TestIOLoopRunSync(unittest.TestCase):