self.finished = True
-class GetNewIOLoopTest(AsyncTestCase):
- def get_new_ioloop(self):
- # Use the current loop instead of creating a new one here.
- return ioloop.IOLoop.current()
-
- def setUp(self):
- # This simulates the effect of an asyncio test harness like
- # pytest-asyncio.
- with ignore_deprecation():
- try:
- self.orig_loop = asyncio.get_event_loop()
- except RuntimeError:
- self.orig_loop = None # type: ignore[assignment]
- self.new_loop = asyncio.new_event_loop()
- asyncio.set_event_loop(self.new_loop)
- super().setUp()
-
- def tearDown(self):
- super().tearDown()
- # AsyncTestCase must not affect the existing asyncio loop.
- self.assertFalse(asyncio.get_event_loop().is_closed())
- asyncio.set_event_loop(self.orig_loop)
- self.new_loop.close()
-
- def test_loop(self):
- self.assertIs(self.io_loop.asyncio_loop, self.new_loop) # type: ignore
-
-
if __name__ == "__main__":
unittest.main()
module=r"tornado\..*",
)
super().setUp()
- try:
- self._prev_aio_loop = asyncio.get_event_loop()
- except RuntimeError:
- self._prev_aio_loop = None # type: ignore[assignment]
self.io_loop = self.get_new_ioloop()
asyncio.set_event_loop(self.io_loop.asyncio_loop) # type: ignore[attr-defined]
# Clean up Subprocess, so it can be used again with a new ioloop.
Subprocess.uninitialize()
- asyncio.set_event_loop(self._prev_aio_loop)
+ asyncio.set_event_loop(None)
if not isinstance(self.io_loop, _NON_OWNED_IOLOOPS):
# Try to clean up any file descriptors left open in the ioloop.
# This avoids leaks, especially when tests are run repeatedly