loop._waker_w.send(b"a")
except BlockingIOError:
pass
- # If we don't join our (daemon) thread here, we may get a deadlock
- # during interpreter shutdown. I don't really understand why. This
- # deadlock happens every time in CI (both travis and appveyor) but
- # I've never been able to reproduce locally.
- loop._thread.join()
+ if loop._thread is not None:
+ # If we don't join our (daemon) thread here, we may get a deadlock
+ # during interpreter shutdown. I don't really understand why. This
+ # deadlock happens every time in CI (both travis and appveyor) but
+ # I've never been able to reproduce locally.
+ loop._thread.join()
_selector_loops.clear()
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
self._real_loop = real_loop
- # Create a thread to run the select system call. We manage this thread
- # manually so we can trigger a clean shutdown from an atexit hook. Note
- # that due to the order of operations at shutdown, only daemon threads
- # can be shut down in this way (non-daemon threads would require the
- # introduction of a new hook: https://bugs.python.org/issue41962)
self._select_cond = threading.Condition()
self._select_args = (
None
) # type: Optional[Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]]]
self._closing_selector = False
- self._thread = threading.Thread(
- name="Tornado selector",
- daemon=True,
- target=self._run_select,
+ self._thread = None # type: Optional[threading.Thread]
+ self._thread_manager_handle = self._thread_manager()
+
+ async def thread_manager_anext() -> None:
+ # the anext builtin wasn't added until 3.10. We just need to iterate
+ # this generator one step.
+ await self._thread_manager_handle.__anext__()
+
+ # When the loop starts, start the thread. Not too soon because we can't
+ # clean up if we get to this point but the event loop is closed without
+ # starting.
+ self._real_loop.call_soon(
+ lambda: self._real_loop.create_task(thread_manager_anext())
)
- self._thread.start()
- # Start the select loop once the loop is started.
- self._real_loop.call_soon(self._start_select)
self._readers = {} # type: Dict[_FileDescriptorLike, Callable]
self._writers = {} # type: Dict[_FileDescriptorLike, Callable]
_selector_loops.add(self)
self.add_reader(self._waker_r, self._consume_waker)
- def __del__(self) -> None:
- # If the top-level application code uses asyncio interfaces to
- # start and stop the event loop, no objects created in Tornado
- # can get a clean shutdown notification. If we're just left to
- # be GC'd, we must explicitly close our sockets to avoid
- # logging warnings.
- _selector_loops.discard(self)
- self._waker_r.close()
- self._waker_w.close()
-
def close(self) -> None:
if self._closed:
return
self._closing_selector = True
self._select_cond.notify()
self._wake_selector()
- self._thread.join()
+ if self._thread is not None:
+ self._thread.join()
_selector_loops.discard(self)
+ self.remove_reader(self._waker_r)
self._waker_r.close()
self._waker_w.close()
self._closed = True
+ async def _thread_manager(self) -> typing.AsyncGenerator[None, None]:
+ # Create a thread to run the select system call. We manage this thread
+ # manually so we can trigger a clean shutdown from an atexit hook. Note
+ # that due to the order of operations at shutdown, only daemon threads
+ # can be shut down in this way (non-daemon threads would require the
+ # introduction of a new hook: https://bugs.python.org/issue41962)
+ self._thread = threading.Thread(
+ name="Tornado selector",
+ daemon=True,
+ target=self._run_select,
+ )
+ self._thread.start()
+ self._start_select()
+ try:
+ # The presense of this yield statement means that this coroutine
+ # is actually an asynchronous generator, which has a special
+ # shutdown protocol. We wait at this yield point until the
+ # event loop's shutdown_asyncgens method is called, at which point
+ # we will get a GeneratorExit exception and can shut down the
+ # selector thread.
+ yield
+ except GeneratorExit:
+ self.close()
+ raise
+
def _wake_selector(self) -> None:
+ if self._closed:
+ return
try:
self._waker_w.send(b"a")
except BlockingIOError:
# under the License.
import asyncio
+import threading
+import time
import unittest
import warnings
self.assertEqual(new_count, 1)
+class SelectorThreadLeakTest(unittest.TestCase):
+ # These tests are only relevant on windows, but they should pass anywhere.
+ def setUp(self):
+ # As a precaution, ensure that we've run an event loop at least once
+ # so if it spins up any singleton threads they're already there.
+ asyncio.run(self.dummy_tornado_coroutine())
+ self.orig_thread_count = threading.active_count()
+
+ def assert_no_thread_leak(self):
+ # For some reason we see transient failures here, but I haven't been able
+ # to catch it to identify which thread is causing it. Whatever thread it
+ # is, it appears to quickly clean up on its own, so just retry a few times.
+ deadline = time.time() + 1
+ while time.time() < deadline:
+ threads = list(threading.enumerate())
+ if len(threads) == self.orig_thread_count:
+ break
+ time.sleep(0.1)
+ self.assertEqual(self.orig_thread_count, len(threads), threads)
+
+ async def dummy_tornado_coroutine(self):
+ # Just access the IOLoop to initialize the selector thread.
+ IOLoop.current()
+
+ def test_asyncio_run(self):
+ for i in range(10):
+ # asyncio.run calls shutdown_asyncgens for us.
+ asyncio.run(self.dummy_tornado_coroutine())
+ self.assert_no_thread_leak()
+
+ def test_asyncio_manual(self):
+ for i in range(10):
+ loop = asyncio.new_event_loop()
+ loop.run_until_complete(self.dummy_tornado_coroutine())
+ # Without this step, we'd leak the thread.
+ loop.run_until_complete(loop.shutdown_asyncgens())
+ loop.close()
+ self.assert_no_thread_leak()
+
+ def test_tornado(self):
+ for i in range(10):
+ # The IOLoop interfaces are aware of the selector thread and
+ # (synchronously) shut it down.
+ loop = IOLoop(make_current=False)
+ loop.run_sync(self.dummy_tornado_coroutine)
+ loop.close()
+ self.assert_no_thread_leak()
+
+
class AnyThreadEventLoopPolicyTest(unittest.TestCase):
def setUp(self):
self.orig_policy = asyncio.get_event_loop_policy()