@property
def asyncio_loop(self):
- return self.io_loop.asyncio_loop
+ return self.io_loop.asyncio_loop # type: ignore
def test_asyncio_callback(self):
# Basic test that the asyncio loop is set up correctly.
# Set the policy and we can get a loop.
asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
self.assertIsInstance(
- self.executor.submit(asyncio.get_event_loop).result(), asyncio.AbstractEventLoop
+ self.executor.submit(asyncio.get_event_loop).result(),
+ asyncio.AbstractEventLoop,
)
# Clean up to silence leak warnings. Always use asyncio since
# IOLoop doesn't (currently) close the underlying loop.
# regardless of this event loop policy.
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
- self.assertIsInstance(
- self.executor.submit(IOLoop.current).result(), IOLoop
- )
+ self.assertIsInstance(self.executor.submit(IOLoop.current).result(), IOLoop)
# Clean up to silence leak warnings. Always use asyncio since
# IOLoop doesn't (currently) close the underlying loop.
self.executor.submit(lambda: asyncio.get_event_loop().close()).result() # type: ignore
asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
- self.assertIsInstance(
- self.executor.submit(IOLoop.current).result(), IOLoop
- )
+ self.assertIsInstance(self.executor.submit(IOLoop.current).result(), IOLoop)
self.executor.submit(lambda: asyncio.get_event_loop().close()).result() # type: ignore