future = self.executor.submit(get_and_close_event_loop)
return future.result()
- def run_policy_test(self, accessor, expected_type):
+ def test_asyncio_accessor(self):
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
# With the default policy, non-main threads don't get an event
# loop.
self.assertRaises(
- (RuntimeError, AssertionError), self.executor.submit(accessor).result
+ RuntimeError, self.executor.submit(asyncio.get_event_loop).result
)
# Set the policy and we can get a loop.
asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
self.assertIsInstance(
- self.executor.submit(accessor).result(), expected_type
+ self.executor.submit(asyncio.get_event_loop).result(), asyncio.AbstractEventLoop
)
# Clean up to silence leak warnings. Always use asyncio since
# IOLoop doesn't (currently) close the underlying loop.
self.executor.submit(lambda: asyncio.get_event_loop().close()).result() # type: ignore
- def test_asyncio_accessor(self):
- self.run_policy_test(asyncio.get_event_loop, asyncio.AbstractEventLoop)
-
def test_tornado_accessor(self):
- self.run_policy_test(IOLoop.current, IOLoop)
+ # Tornado's IOLoop.current() API can create a loop for any thread,
+ # regardless of this event loop policy.
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ self.assertIsInstance(
+ self.executor.submit(IOLoop.current).result(), IOLoop
+ )
+ # Clean up to silence leak warnings. Always use asyncio since
+ # IOLoop doesn't (currently) close the underlying loop.
+ self.executor.submit(lambda: asyncio.get_event_loop().close()).result() # type: ignore
+
+ asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
+ self.assertIsInstance(
+ self.executor.submit(IOLoop.current).result(), IOLoop
+ )
+ self.executor.submit(lambda: asyncio.get_event_loop().close()).result() # type: ignore