according to spec: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.close
The call to _wake_selector would fail with EBADF when close is called a second time
but can be attached to a running asyncio loop.
"""
+ _closed = False
+
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
self._real_loop = real_loop
self._waker_w.close()
def close(self) -> None:
+ if self._closed:
+ return
with self._select_cond:
self._closing_selector = True
self._select_cond.notify()
_selector_loops.discard(self)
self._waker_r.close()
self._waker_w.close()
+ self._closed = True
def _wake_selector(self) -> None:
try:
AsyncIOLoop,
to_asyncio_future,
AnyThreadEventLoopPolicy,
+ AddThreadSelectorEventLoop,
)
from tornado.testing import AsyncTestCase, gen_test
42,
)
+ def test_add_thread_close_idempotent(self):
+ loop = AddThreadSelectorEventLoop(asyncio.get_event_loop())
+ loop.close()
+ loop.close()
+
class LeakTest(unittest.TestCase):
def setUp(self):