Windows. Use the `~asyncio.SelectorEventLoop` instead.
"""
+import asyncio
+import atexit
import concurrent.futures
import functools
+import itertools
import sys
-
-from threading import get_ident
+import threading
+import typing
from tornado.gen import convert_yielded
from tornado.ioloop import IOLoop, _Selectable
-import asyncio
-
-import typing
from typing import Any, TypeVar, Awaitable, Callable, Union, Optional
if typing.TYPE_CHECKING:
_T = TypeVar("_T")
+class _HasFileno(typing.Protocol):
+ def fileno(self) -> int:
+ pass
+
+
+_FileDescriptorLike = Union[int, _HasFileno]
+
+
+_seq_gen = itertools.count()
+
+_atexit_run = False
+
+
+def _atexit_callback() -> None:
+ global _atexit_run
+ _atexit_run = True
+
+
+atexit.register(_atexit_callback)
+
+
class BaseAsyncIOLoop(IOLoop):
def initialize( # type: ignore
self, asyncio_loop: asyncio.AbstractEventLoop, **kwargs: Any
) -> None:
+ # asyncio_loop is always the real underlying IOLoop. This is used in
+ # ioloop.py to maintain the asyncio-to-ioloop mappings.
self.asyncio_loop = asyncio_loop
+ # selector_loop is an event loop that implements the add_reader family of
+ # methods. Usually the same as asyncio_loop but differs on platforms such
+ # as windows where the default event loop does not implement these methods.
+ self.selector_loop = asyncio_loop
+ if hasattr(asyncio, "ProactorEventLoop") and isinstance(
+ asyncio_loop, asyncio.ProactorEventLoop
+ ):
+ # Ignore this line for mypy because the abstract method checker
+ # doesn't understand dynamic proxies.
+ self.selector_loop = AddThreadSelectorEventLoop(asyncio_loop) # type: ignore
# Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
self.handlers = {} # type: Dict[int, Tuple[Union[int, _Selectable], Callable]]
# Set of fds listening for reads/writes
super(BaseAsyncIOLoop, self).initialize(**kwargs)
def assign_thread_identity() -> None:
- self._thread_identity = get_ident()
+ self._thread_identity = threading.get_ident()
self.add_callback(assign_thread_identity)
# assume it was closed from the asyncio side, and do this
# cleanup for us, leading to a KeyError.
del IOLoop._ioloop_for_asyncio[self.asyncio_loop]
+ if self.selector_loop is not self.asyncio_loop:
+ self.selector_loop.close()
self.asyncio_loop.close()
def add_handler(
raise ValueError("fd %s added twice" % fd)
self.handlers[fd] = (fileobj, handler)
if events & IOLoop.READ:
- self.asyncio_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ)
+ self.selector_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ)
self.readers.add(fd)
if events & IOLoop.WRITE:
- self.asyncio_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
+ self.selector_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
self.writers.add(fd)
def update_handler(self, fd: Union[int, _Selectable], events: int) -> None:
fd, fileobj = self.split_fd(fd)
if events & IOLoop.READ:
if fd not in self.readers:
- self.asyncio_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ)
+ self.selector_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ)
self.readers.add(fd)
else:
if fd in self.readers:
- self.asyncio_loop.remove_reader(fd)
+ self.selector_loop.remove_reader(fd)
self.readers.remove(fd)
if events & IOLoop.WRITE:
if fd not in self.writers:
- self.asyncio_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
+ self.selector_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
self.writers.add(fd)
else:
if fd in self.writers:
- self.asyncio_loop.remove_writer(fd)
+ self.selector_loop.remove_writer(fd)
self.writers.remove(fd)
def remove_handler(self, fd: Union[int, _Selectable]) -> None:
if fd not in self.handlers:
return
if fd in self.readers:
- self.asyncio_loop.remove_reader(fd)
+ self.selector_loop.remove_reader(fd)
self.readers.remove(fd)
if fd in self.writers:
- self.asyncio_loop.remove_writer(fd)
+ self.selector_loop.remove_writer(fd)
self.writers.remove(fd)
del self.handlers[fd]
timeout.cancel() # type: ignore
def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
- if get_ident() == self._thread_identity:
+ if threading.get_ident() == self._thread_identity:
call_soon = self.asyncio_loop.call_soon
else:
call_soon = self.asyncio_loop.call_soon_threadsafe
# add_callback that completes without error will
# eventually execute).
pass
+ except AttributeError:
+ # ProactorEventLoop may raise this instead of RuntimeError
+ # if call_soon_threadsafe races with a call to close().
+ # Swallow it too for consistency.
+ pass
def add_callback_from_signal(
self, callback: Callable, *args: Any, **kwargs: Any
self,
executor: Optional[concurrent.futures.Executor],
func: Callable[..., _T],
- *args: Any
+ *args: Any,
) -> Awaitable[_T]:
return self.asyncio_loop.run_in_executor(executor, func, *args)
loop = self.new_event_loop()
self.set_event_loop(loop)
return loop
+
+
+class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
+ """Wrap an event loop to add implementations of the ``add_reader`` method family.
+
+ Instances of this class start a second thread to run a selector-based event loop.
+ This thread is completely hidden from the user; all callbacks are run on the
+ wrapped event loop's thread.
+
+ This class is used automatically by Tornado; applications should not need
+ to refer to it directly.
+
+ It is safe to wrap any event loop with this class, although it only makes sense
+ for event loops that do not implement the ``add_reader`` family of methods
+ themselves (i.e. ``WindowsProactorEventLoop``)
+
+ Closing the ``AddThreadSelectorEventLoop`` does not close the wrapped event loop.
+ """
+
+ # This class is a __getattribute__-based proxy. All attributes other than those
+ # in this set are proxied through to the underlying loop.
+ MY_ATTRIBUTES = {
+ "add_reader",
+ "add_writer",
+ "remove_reader",
+ "remove_writer",
+ "close",
+ "_real_loop",
+ "_selector_loop",
+ "_selector_thread",
+ "_run_on_selector",
+ "_handle_event_from_selector",
+ "_reader_seq",
+ "_writer_seq",
+ }
+
+ def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
+ self._real_loop = real_loop
+
+ # Sequence numbers allow us to detect races between the selector thread
+ # and the main thread, such as when a handler for a file descriptor has
+ # been removed and re-added. These maps go from file descriptor to a
+ # sequence number.
+ self._reader_seq = {} # type: Dict[_FileDescriptorLike, int]
+ self._writer_seq = {} # type: Dict[_FileDescriptorLike, int]
+
+ fut = (
+ concurrent.futures.Future()
+ ) # type: concurrent.futures.Future[asyncio.AbstractEventLoop]
+
+ def f() -> None:
+ loop = asyncio.SelectorEventLoop()
+ fut.set_result(loop)
+ loop.run_forever()
+ loop.close()
+
+ self._selector_thread = threading.Thread(target=f)
+ # Must be a daemon in case this event loop is not explicitly closed
+ # (often the case for the main loop).
+ self._selector_thread.daemon = True
+ self._selector_thread.start()
+ self._selector_loop = fut.result()
+
+ def close(self) -> None:
+ self._selector_loop.call_soon_threadsafe(self._selector_loop.stop)
+ if not _atexit_run:
+ # Shutdown is tricky: Our thread must be set as a daemon so that it
+ # doesn't prevent shutdown in the common case of an unclosed main
+ # loop. But daemon threads are halted relatively early in the
+ # interpreter shutdown process; once this happens attempts to join
+ # them will block forever.
+ #
+ # I can't find formal documentation of this, but as of cpython 3.8
+ # the shutdown order is
+ # 1. atexit functions
+ # 2. daemon threads halt
+ # 3. global destructors run
+ #
+ # If we're running after atexit functions, we're probably in a
+ # global destructor. But in any case, we know that the process is
+ # about to exit and it's no longer necessary to join our daemon
+ # thread. (Is it ever necessary to join it? Probably not but it
+ # feels dirty not to)
+ self._selector_thread.join()
+ self._real_loop.close()
+
+ def __getattribute__(self, name: str) -> Any:
+ if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
+ return super().__getattribute__(name)
+ return getattr(self._real_loop, name)
+
+ def _run_on_selector(self, method: Callable[..., _T], *args: Any) -> _T:
+ """Synchronously run the given method on the selector thread.
+ """
+ fut = concurrent.futures.Future() # type: concurrent.futures.Future[_T]
+
+ def wrapper() -> None:
+ try:
+ result = method(*args)
+ except Exception as e:
+ fut.set_exception(e)
+ else:
+ fut.set_result(result)
+
+ self._selector_loop.call_soon_threadsafe(wrapper)
+ return fut.result()
+
+ def add_reader(
+ self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
+ ) -> None:
+ seq = next(_seq_gen)
+ self._reader_seq[fd] = seq
+
+ def wrapper() -> None:
+ if self._reader_seq.get(fd, None) != seq:
+ return
+ callback(*args)
+
+ return self._run_on_selector(
+ self._selector_loop.add_reader,
+ fd,
+ self._real_loop.call_soon_threadsafe,
+ wrapper,
+ )
+
+ def add_writer(
+ self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
+ ) -> None:
+ seq = next(_seq_gen)
+ self._writer_seq[fd] = seq
+
+ def wrapper() -> None:
+ if self._writer_seq.get(fd, None) != seq:
+ return
+ callback(*args)
+
+ return self._run_on_selector(
+ self._selector_loop.add_writer,
+ fd,
+ self._real_loop.call_soon_threadsafe,
+ wrapper,
+ )
+
+ def remove_reader(self, fd: _FileDescriptorLike) -> None:
+ del self._reader_seq[fd]
+ return self._run_on_selector(self._selector_loop.remove_reader, fd)
+
+ def remove_writer(self, fd: _FileDescriptorLike) -> None:
+ del self._writer_seq[fd]
+ return self._run_on_selector(self._selector_loop.remove_writer, fd)