import time
import traceback
import math
-import weakref
from tornado.concurrent import Future, is_future, chain_future, future_set_exc_info, future_add_done_callback # noqa: E501
from tornado.log import app_log, gen_log
_current = threading.local()
# In Python 3, _ioloop_for_asyncio maps from asyncio loops to IOLoops.
- _ioloop_for_asyncio = weakref.WeakKeyDictionary()
+ _ioloop_for_asyncio = dict()
@classmethod
def configure(cls, impl, **kwargs):
self.readers = set()
self.writers = set()
self.closing = False
+ # If an asyncio loop was closed through an asyncio interface
+ # instead of IOLoop.close(), we'd never hear about it and may
+ # have left a dangling reference in our map. In case an
+ # application (or, more likely, a test suite) creates and
+ # destroys a lot of event loops in this way, check here to
+ # ensure that we don't have a lot of dead loops building up in
+ # the map.
+ #
+ # TODO(bdarnell): consider making self.asyncio_loop a weakref
+ # for AsyncIOMainLoop and make _ioloop_for_asyncio a
+ # WeakKeyDictionary.
+ for loop in list(IOLoop._ioloop_for_asyncio):
+ if loop.is_closed():
+ del IOLoop._ioloop_for_asyncio[loop]
IOLoop._ioloop_for_asyncio[asyncio_loop] = self
super(BaseAsyncIOLoop, self).initialize(**kwargs)
if all_fds:
self.close_fd(fileobj)
self.asyncio_loop.close()
+ del IOLoop._ioloop_for_asyncio[self.asyncio_loop]
def add_handler(self, fd, handler, events):
fd, fileobj = self.split_fd(fd)
42)
+@unittest.skipIf(asyncio is None, "asyncio module not present")
+class LeakTest(unittest.TestCase):
+ def setUp(self):
+ # Trigger a cleanup of the mapping so we start with a clean slate.
+ AsyncIOLoop().close()
+ # If we don't clean up after ourselves other tests may fail on
+ # py34.
+ self.orig_policy = asyncio.get_event_loop_policy()
+ asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
+
+ def tearDown(self):
+ asyncio.get_event_loop().close()
+ asyncio.set_event_loop_policy(self.orig_policy)
+
+ def test_ioloop_close_leak(self):
+ orig_count = len(IOLoop._ioloop_for_asyncio)
+ for i in range(10):
+ # Create and close an AsyncIOLoop using Tornado interfaces.
+ loop = AsyncIOLoop()
+ loop.close()
+ new_count = len(IOLoop._ioloop_for_asyncio) - orig_count
+ self.assertEqual(new_count, 0)
+
+ def test_asyncio_close_leak(self):
+ orig_count = len(IOLoop._ioloop_for_asyncio)
+ for i in range(10):
+ # Create and close an AsyncIOMainLoop using asyncio interfaces.
+ loop = asyncio.new_event_loop()
+ loop.call_soon(IOLoop.current)
+ loop.call_soon(loop.stop)
+ loop.run_forever()
+ loop.close()
+ new_count = len(IOLoop._ioloop_for_asyncio) - orig_count
+ # Because the cleanup is run on new loop creation, we have one
+ # dangling entry in the map (but only one).
+ self.assertEqual(new_count, 1)
+
+
@unittest.skipIf(asyncio is None, "asyncio module not present")
class AnyThreadEventLoopPolicyTest(unittest.TestCase):
def setUp(self):