import asyncio
import atexit
import concurrent.futures
+import errno
import functools
-import itertools
+import select
+import socket
import sys
import threading
import typing
from tornado.gen import convert_yielded
from tornado.ioloop import IOLoop, _Selectable
-from typing import Any, TypeVar, Awaitable, Callable, Union, Optional
+from typing import Any, TypeVar, Awaitable, Callable, Union, Optional, List, Tuple, Dict
if typing.TYPE_CHECKING:
- from typing import Set, Dict, Tuple # noqa: F401
-
-_T = TypeVar("_T")
-
-
-class _HasFileno(typing.Protocol):
- def fileno(self) -> int:
- pass
+ from typing import Set # noqa: F401
+ from typing_extensions import Protocol
+ class _HasFileno(Protocol):
+ def fileno(self) -> int:
+ pass
-_FileDescriptorLike = Union[int, _HasFileno]
+ _FileDescriptorLike = Union[int, _HasFileno]
+_T = TypeVar("_T")
-_seq_gen = itertools.count()
-_atexit_run = False
+# Collection of sockets to write to at shutdown to wake up any selector threads.
+_waker_sockets = set() # type: Set[socket.socket]
def _atexit_callback() -> None:
- global _atexit_run
- _atexit_run = True
+ for fd in _waker_sockets:
+ try:
+ fd.send(b"a")
+ except BlockingIOError:
+ pass
+# atexit callbacks are run in LIFO order. Our callback must run before
+# ThreadPoolExecutor's or it will deadlock (the pool's threads can't
+# finish their work items until we write to their waker sockets). In
+# recent versions of Python the thread pool atexit callback is
+# registered in a getattr hook the first time TPE is *referenced*
+# (instead of older versions of python where it was registered when
+# concurrent.futures was imported).
+concurrent.futures.ThreadPoolExecutor
atexit.register(_atexit_callback)
# as windows where the default event loop does not implement these methods.
self.selector_loop = asyncio_loop
if hasattr(asyncio, "ProactorEventLoop") and isinstance(
- asyncio_loop, asyncio.ProactorEventLoop
+ asyncio_loop, asyncio.ProactorEventLoop # type: ignore
):
# Ignore this line for mypy because the abstract method checker
# doesn't understand dynamic proxies.
self,
executor: Optional[concurrent.futures.Executor],
func: Callable[..., _T],
- *args: Any,
+ *args: Any
) -> Awaitable[_T]:
return self.asyncio_loop.run_in_executor(executor, func, *args)
class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
"""Wrap an event loop to add implementations of the ``add_reader`` method family.
- Instances of this class start a second thread to run a selector-based event loop.
- This thread is completely hidden from the user; all callbacks are run on the
- wrapped event loop's thread.
+ Instances of this class start a second thread to run a selector.
+ This thread is completely hidden from the user; all callbacks are
+ run on the wrapped event loop's thread.
This class is used automatically by Tornado; applications should not need
to refer to it directly.
for event loops that do not implement the ``add_reader`` family of methods
themselves (i.e. ``WindowsProactorEventLoop``)
- Closing the ``AddThreadSelectorEventLoop`` does not close the wrapped event loop.
+ Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop.
+
"""
# This class is a __getattribute__-based proxy. All attributes other than those
# in this set are proxied through to the underlying loop.
MY_ATTRIBUTES = {
+ "_consume_waker",
+ "_executor",
+ "_handle_event",
+ "_readers",
+ "_real_loop",
+ "_run_select",
+ "_select_loop",
+ "_selector_task",
+ "_start_selector",
+ "_wake_selector",
+ "_waker_r",
+ "_waker_w",
+ "_writers",
"add_reader",
"add_writer",
+ "close",
"remove_reader",
"remove_writer",
- "close",
- "_real_loop",
- "_selector_loop",
- "_selector_thread",
- "_run_on_selector",
- "_handle_event_from_selector",
- "_reader_seq",
- "_writer_seq",
}
+ def __getattribute__(self, name: str) -> Any:
+ if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
+ return super().__getattribute__(name)
+ return getattr(self._real_loop, name)
+
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
self._real_loop = real_loop
-
- # Sequence numbers allow us to detect races between the selector thread
- # and the main thread, such as when a handler for a file descriptor has
- # been removed and re-added. These maps go from file descriptor to a
- # sequence number.
- self._reader_seq = {} # type: Dict[_FileDescriptorLike, int]
- self._writer_seq = {} # type: Dict[_FileDescriptorLike, int]
-
- fut = (
- concurrent.futures.Future()
- ) # type: concurrent.futures.Future[asyncio.AbstractEventLoop]
-
- def f() -> None:
- loop = asyncio.SelectorEventLoop()
- fut.set_result(loop)
- loop.run_forever()
- loop.close()
-
- self._selector_thread = threading.Thread(target=f)
- # Must be a daemon in case this event loop is not explicitly closed
- # (often the case for the main loop).
- self._selector_thread.daemon = True
- self._selector_thread.start()
- self._selector_loop = fut.result()
+ # Create our own executor to ensure we always have a thread
+ # available (we'll keep it 100% busy) instead of contending
+ # with the application for a thread in the default executor.
+ self._executor = concurrent.futures.ThreadPoolExecutor(1)
+ self._selector_task = None
+ # Start the select loop once the loop is started.
+ self._real_loop.call_soon(self._start_selector)
+
+ self._readers = {} # type: Dict[_FileDescriptorLike, Callable]
+ self._writers = {} # type: Dict[_FileDescriptorLike, Callable]
+
+ # Writing to _waker_w will wake up the selector thread, which
+ # watches for _waker_r to be readable.
+ self._waker_r, self._waker_w = socket.socketpair()
+ self._waker_r.setblocking(False)
+ self._waker_w.setblocking(False)
+ _waker_sockets.add(self._waker_w)
+ self.add_reader(self._waker_r, self._consume_waker)
+
+ def __del__(self) -> None:
+ # If the top-level application code uses asyncio interfaces to
+ # start and stop the event loop, no objects created in Tornado
+ # can get a clean shutdown notification. If we're just left to
+ # be GC'd, we must explicitly close our sockets to avoid
+ # logging warnings.
+ _waker_sockets.discard(self._waker_w)
+ self._waker_r.close()
+ self._waker_w.close()
def close(self) -> None:
- self._selector_loop.call_soon_threadsafe(self._selector_loop.stop)
- if not _atexit_run:
- # Shutdown is tricky: Our thread must be set as a daemon so that it
- # doesn't prevent shutdown in the common case of an unclosed main
- # loop. But daemon threads are halted relatively early in the
- # interpreter shutdown process; once this happens attempts to join
- # them will block forever.
- #
- # I can't find formal documentation of this, but as of cpython 3.8
- # the shutdown order is
- # 1. atexit functions
- # 2. daemon threads halt
- # 3. global destructors run
- #
- # If we're running after atexit functions, we're probably in a
- # global destructor. But in any case, we know that the process is
- # about to exit and it's no longer necessary to join our daemon
- # thread. (Is it ever necessary to join it? Probably not but it
- # feels dirty not to)
- self._selector_thread.join()
+ if self._selector_task is not None:
+ self._selector_task.cancel()
+ try:
+ # Cancellation is not immediate (coroutines are given
+ # a chance to catch the error and recover) so we must
+ # restart the loop here to allow our selector task to
+ # finish and avoid logging warnings at shutdown.
+ self._real_loop.run_until_complete(self._selector_task)
+ except asyncio.CancelledError:
+ pass
+ self._wake_selector()
+ self._executor.shutdown()
+ _waker_sockets.discard(self._waker_w)
+ self._waker_r.close()
+ self._waker_w.close()
self._real_loop.close()
- def __getattribute__(self, name: str) -> Any:
- if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
- return super().__getattribute__(name)
- return getattr(self._real_loop, name)
+ def _wake_selector(self) -> None:
+ try:
+ self._waker_w.send(b"a")
+ except BlockingIOError:
+ pass
- def _run_on_selector(self, method: Callable[..., _T], *args: Any) -> _T:
- """Synchronously run the given method on the selector thread.
- """
- fut = concurrent.futures.Future() # type: concurrent.futures.Future[_T]
+ def _consume_waker(self) -> None:
+ try:
+ self._waker_r.recv(1024)
+ except BlockingIOError:
+ pass
- def wrapper() -> None:
- try:
- result = method(*args)
- except Exception as e:
- fut.set_exception(e)
- else:
- fut.set_result(result)
+ def _run_select(
+ self, to_read: List[int], to_write: List[int]
+ ) -> Tuple[List[int], List[int]]:
+ # We use the simpler interface of the select module instead of
+ # the more stateful interface in the selectors module because
+ # this class is only intended for use on windows, where
+ # select.select is the only option. The selector interface
+ # does not have well-documented thread-safety semantics that
+ # we can rely on so ensuring proper synchronization would be
+ # tricky.
+ try:
+ # On windows, selecting on a socket for write will not
+ # return the socket when there is an error (but selecting
+ # for reads works). Also select for errors when selecting
+ # for writes, and merge the results.
+ #
+ # This pattern is also used in
+ # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
+ rs, ws, xs = select.select(to_read, to_write, to_write)
+ ws = ws + xs
+ except OSError as e:
+ # After remove_reader or remove_writer is called, the file
+ # descriptor may subsequently be closed on the event loop
+ # thread. It's possible that this select thread hasn't
+ # gotten into the select system call by the time that
+ # happens in which case (at least on macOS), select may
+ # raise a "bad file descriptor" error. If we get that
+ # error, check and see if we're also being woken up by
+ # polling the waker alone. If we are, just return to the
+ # event loop and we'll get the updated set of file
+ # descriptors on the next iteration. Otherwise, raise the
+ # original error.
+ if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
+ rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
+ if rs:
+ return rs, []
+ raise
+ return rs, ws
+
+ def _start_selector(self) -> None:
+ self._selector_task = asyncio.create_task(self._select_loop()) # type: ignore
+
+ async def _select_loop(self) -> None:
+ while True:
+ # Capture reader and writer sets here in the event loop
+ # thread to avoid any problems with concurrent
+ # modification while the select loop uses them.
+ rs, ws = await self.run_in_executor(
+ self._executor,
+ self._run_select,
+ list(self._readers.keys()),
+ list(self._writers.keys()),
+ )
+ for r in rs:
+ self._handle_event(r, self._readers)
+ for w in ws:
+ self._handle_event(w, self._writers)
- self._selector_loop.call_soon_threadsafe(wrapper)
- return fut.result()
+ def _handle_event(
+ self, fd: "_FileDescriptorLike", cb_map: Dict["_FileDescriptorLike", Callable],
+ ) -> None:
+ try:
+ callback = cb_map[fd]
+ except KeyError:
+ return
+ callback()
def add_reader(
- self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
+ self, fd: "_FileDescriptorLike", callback: Callable[..., None], *args: Any
) -> None:
- seq = next(_seq_gen)
- self._reader_seq[fd] = seq
-
- def wrapper() -> None:
- if self._reader_seq.get(fd, None) != seq:
- return
- callback(*args)
-
- return self._run_on_selector(
- self._selector_loop.add_reader,
- fd,
- self._real_loop.call_soon_threadsafe,
- wrapper,
- )
+ self._readers[fd] = functools.partial(callback, *args)
+ self._wake_selector()
def add_writer(
- self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
+ self, fd: "_FileDescriptorLike", callback: Callable[..., None], *args: Any
) -> None:
- seq = next(_seq_gen)
- self._writer_seq[fd] = seq
-
- def wrapper() -> None:
- if self._writer_seq.get(fd, None) != seq:
- return
- callback(*args)
-
- return self._run_on_selector(
- self._selector_loop.add_writer,
- fd,
- self._real_loop.call_soon_threadsafe,
- wrapper,
- )
+ self._writers[fd] = functools.partial(callback, *args)
+ self._wake_selector()
- def remove_reader(self, fd: _FileDescriptorLike) -> None:
- del self._reader_seq[fd]
- return self._run_on_selector(self._selector_loop.remove_reader, fd)
+ def remove_reader(self, fd: "_FileDescriptorLike") -> None:
+ del self._readers[fd]
+ self._wake_selector()
- def remove_writer(self, fd: _FileDescriptorLike) -> None:
- del self._writer_seq[fd]
- return self._run_on_selector(self._selector_loop.remove_writer, fd)
+ def remove_writer(self, fd: "_FileDescriptorLike") -> None:
+ del self._writers[fd]
+ self._wake_selector()