so this method is now equivalent to `tornado.gen.convert_yielded`.
"""
return convert_yielded(tornado_future)
+
+
+class AnyThreadEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
+ """Event loop policy that allows loop creation on any thread.
+
+ The default `asyncio` event loop policy only automatically creates
+ event loops in the main threads. Other threads must create event
+ loops explicitly or `asyncio.get_event_loop` (and therefore
+ `.IOLoop.current`) will fail. Installing this policy allows event
+ loops to be created automatically on any thread, matching the
+ behavior of Tornado versions prior to 5.0 (or 5.0 on Python 2).
+
+ Usage::
+
+ asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
+
+ .. versionadded:: 5.0
+
+ """
+ def get_event_loop(self):
+ try:
+ return super().get_event_loop()
+ except RuntimeError:
+ # "There is no current event loop in thread %r"
+ loop = self.new_event_loop()
+ self.set_event_loop(loop)
+ return loop
from __future__ import absolute_import, division, print_function
+from concurrent.futures import ThreadPoolExecutor
from tornado import gen
+from tornado.ioloop import IOLoop
from tornado.testing import AsyncTestCase, gen_test
from tornado.test.util import unittest, skipBefore33, skipBefore35, exec_test
except ImportError:
asyncio = None
else:
- from tornado.platform.asyncio import AsyncIOLoop, to_asyncio_future
+ from tornado.platform.asyncio import AsyncIOLoop, to_asyncio_future, AnyThreadEventLoopPolicy
# This is used in dynamically-evaluated code, so silence pyflakes.
to_asyncio_future
asyncio.get_event_loop().run_until_complete(
native_coroutine_with_adapter2()),
42)
+
+
+@unittest.skipIf(asyncio is None, "asyncio module not present")
+class AnyThreadEventLoopPolicyTest(unittest.TestCase):
+ def setUp(self):
+ self.orig_policy = asyncio.get_event_loop_policy()
+ self.executor = ThreadPoolExecutor(1)
+
+ def tearDown(self):
+ asyncio.set_event_loop_policy(self.orig_policy)
+ self.executor.shutdown()
+
+ def get_event_loop_on_thread(self):
+ def get_and_close_event_loop():
+ """Get the event loop. Close it if one is returned.
+
+ Returns the (closed) event loop. This is a silly thing
+ to do and leaves the thread in a broken state, but it's
+ enough for this test. Closing the loop avoids resource
+ leak warnings.
+ """
+ loop = asyncio.get_event_loop()
+ loop.close()
+ return loop
+ future = self.executor.submit(get_and_close_event_loop)
+ return future.result()
+
+ def run_policy_test(self, accessor, expected_type):
+ # With the default policy, non-main threads don't get an event
+ # loop.
+ self.assertRaises(RuntimeError,
+ self.executor.submit(accessor).result)
+ # Set the policy and we can get a loop.
+ asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
+ self.assertIsInstance(
+ self.executor.submit(accessor).result(),
+ expected_type)
+ # Clean up to silence leak warnings. Always use asyncio since
+ # IOLoop doesn't (currently) close the underlying loop.
+ self.executor.submit(lambda: asyncio.get_event_loop().close()).result()
+
+ def test_asyncio_accessor(self):
+ self.run_policy_test(asyncio.get_event_loop, asyncio.AbstractEventLoop)
+
+ def test_tornado_accessor(self):
+ self.run_policy_test(IOLoop.current, IOLoop)