import traceback
import warnings
+try:
+ import asyncio
+except ImportError:
+ asyncio = None
+
@contextlib.contextmanager
def set_environ(name, value):
self.finished = True
+@unittest.skipIf(asyncio is None, "asyncio module not present")
+class GetNewIOLoopTest(AsyncTestCase):
+ def get_new_ioloop(self):
+ # Use the current loop instead of creating a new one here.
+ return ioloop.IOLoop.current()
+
+ def setUp(self):
+ # This simulates the effect of an asyncio test harness like
+ # pytest-asyncio.
+ self.orig_loop = asyncio.get_event_loop()
+ self.new_loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(self.new_loop)
+ super(GetNewIOLoopTest, self).setUp()
+
+ def tearDown(self):
+ super(GetNewIOLoopTest, self).tearDown()
+ # AsyncTestCase must not affect the existing asyncio loop.
+ self.assertFalse(asyncio.get_event_loop().is_closed())
+ asyncio.set_event_loop(self.orig_loop)
+ self.new_loop.close()
+
+ def test_loop(self):
+ self.assertIs(self.io_loop.asyncio_loop, self.new_loop)
+
+
if __name__ == '__main__':
unittest.main()
import unittest # type: ignore
+if asyncio is None:
+ _NON_OWNED_IOLOOPS = ()
+else:
+ import tornado.platform.asyncio
+ _NON_OWNED_IOLOOPS = tornado.platform.asyncio.AsyncIOMainLoop
+
def bind_unused_port(reuse_port=False):
"""Binds a server socket to an available port on localhost.
# Clean up Subprocess, so it can be used again with a new ioloop.
Subprocess.uninitialize()
self.io_loop.clear_current()
- # Try to clean up any file descriptors left open in the ioloop.
- # This avoids leaks, especially when tests are run repeatedly
- # in the same process with autoreload (because curl does not
- # set FD_CLOEXEC on its file descriptors)
- self.io_loop.close(all_fds=True)
+ if not isinstance(self.io_loop, _NON_OWNED_IOLOOPS):
+ # Try to clean up any file descriptors left open in the ioloop.
+ # This avoids leaks, especially when tests are run repeatedly
+ # in the same process with autoreload (because curl does not
+ # set FD_CLOEXEC on its file descriptors)
+ self.io_loop.close(all_fds=True)
super(AsyncTestCase, self).tearDown()
# In case an exception escaped or the StackContext caught an exception
# when there wasn't a wait() to re-raise it, do so here.
self.__rethrow()
def get_new_ioloop(self):
- """Creates a new `.IOLoop` for this test. May be overridden in
- subclasses for tests that require a specific `.IOLoop` (usually
- the singleton `.IOLoop.instance()`).
+ """Returns the `.IOLoop` to use for this test.
+
+ By default, a new `.IOLoop` is created for each test.
+ Subclasses may override this method to return
+ `.IOLoop.current()` if it is not appropriate to use a new
+ `.IOLoop` in each tests (for example, if there are global
+ singletons using the default `.IOLoop`) or if a per-test event
+ loop is being provided by another system (such as
+ ``pytest-asyncio``).
"""
return IOLoop()