.. versionchanged:: 5.0
This method also clears the current `asyncio` event loop.
"""
- old = IOLoop._current.instance
+ old = getattr(IOLoop._current, "instance", None)
if old is not None:
old._clear_current_hook()
IOLoop._current.instance = None
from __future__ import absolute_import, division, print_function
+from concurrent.futures import ThreadPoolExecutor
import contextlib
import datetime
import functools
self.assertIs(self.io_loop, IOLoop.current())
+class TestIOLoopCurrentAsync(AsyncTestCase):
+ @gen_test
+ def test_clear_without_current(self):
+ # If there is no current IOLoop, clear_current is a no-op (but
+ # should not fail). Use a thread so we see the threading.Local
+ # in a pristine state.
+ with ThreadPoolExecutor(1) as e:
+ yield e.submit(IOLoop.clear_current)
+
+
class TestIOLoopAddCallback(AsyncTestCase):
def setUp(self):
super(TestIOLoopAddCallback, self).setUp()