]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
asyncio: Improve support Python 3.8 on Windows
authorBen Darnell <ben@bendarnell.com>
Sat, 22 Feb 2020 23:36:11 +0000 (18:36 -0500)
committerBen Darnell <ben@bendarnell.com>
Wed, 2 Sep 2020 15:10:13 +0000 (11:10 -0400)
This commit removes the need for applications to work around the
backwards-incompatible change to the default event loop. Instead,
Tornado will detect the use of the windows proactor event loop and
start a selector event loop in a separate thread.

Closes #2804

tornado/platform/asyncio.py
tornado/test/__init__.py [deleted file]
tornado/test/twisted_test.py

index 9b44775fa2220a19d35e739fd7557c8d1764c634..1437581248da565438df66fe73b42269ae16350b 100644 (file)
@@ -19,17 +19,17 @@ the same event loop.
    Windows. Use the `~asyncio.SelectorEventLoop` instead.
 """
 
+import asyncio
+import atexit
 import concurrent.futures
 import functools
+import itertools
 import sys
-
-from threading import get_ident
+import threading
+import typing
 from tornado.gen import convert_yielded
 from tornado.ioloop import IOLoop, _Selectable
 
-import asyncio
-
-import typing
 from typing import Any, TypeVar, Awaitable, Callable, Union, Optional
 
 if typing.TYPE_CHECKING:
@@ -38,11 +38,44 @@ if typing.TYPE_CHECKING:
 _T = TypeVar("_T")
 
 
+class _HasFileno(typing.Protocol):
+    def fileno(self) -> int:
+        pass
+
+
+_FileDescriptorLike = Union[int, _HasFileno]
+
+
+_seq_gen = itertools.count()
+
+_atexit_run = False
+
+
+def _atexit_callback() -> None:
+    global _atexit_run
+    _atexit_run = True
+
+
+atexit.register(_atexit_callback)
+
+
 class BaseAsyncIOLoop(IOLoop):
     def initialize(  # type: ignore
         self, asyncio_loop: asyncio.AbstractEventLoop, **kwargs: Any
     ) -> None:
+        # asyncio_loop is always the real underlying IOLoop. This is used in
+        # ioloop.py to maintain the asyncio-to-ioloop mappings.
         self.asyncio_loop = asyncio_loop
+        # selector_loop is an event loop that implements the add_reader family of
+        # methods. Usually the same as asyncio_loop but differs on platforms such
+        # as windows where the default event loop does not implement these methods.
+        self.selector_loop = asyncio_loop
+        if hasattr(asyncio, "ProactorEventLoop") and isinstance(
+            asyncio_loop, asyncio.ProactorEventLoop
+        ):
+            # Ignore this line for mypy because the abstract method checker
+            # doesn't understand dynamic proxies.
+            self.selector_loop = AddThreadSelectorEventLoop(asyncio_loop)  # type: ignore
         # Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
         self.handlers = {}  # type: Dict[int, Tuple[Union[int, _Selectable], Callable]]
         # Set of fds listening for reads/writes
@@ -70,7 +103,7 @@ class BaseAsyncIOLoop(IOLoop):
         super(BaseAsyncIOLoop, self).initialize(**kwargs)
 
         def assign_thread_identity() -> None:
-            self._thread_identity = get_ident()
+            self._thread_identity = threading.get_ident()
 
         self.add_callback(assign_thread_identity)
 
@@ -87,6 +120,8 @@ class BaseAsyncIOLoop(IOLoop):
         # assume it was closed from the asyncio side, and do this
         # cleanup for us, leading to a KeyError.
         del IOLoop._ioloop_for_asyncio[self.asyncio_loop]
+        if self.selector_loop is not self.asyncio_loop:
+            self.selector_loop.close()
         self.asyncio_loop.close()
 
     def add_handler(
@@ -97,29 +132,29 @@ class BaseAsyncIOLoop(IOLoop):
             raise ValueError("fd %s added twice" % fd)
         self.handlers[fd] = (fileobj, handler)
         if events & IOLoop.READ:
-            self.asyncio_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ)
+            self.selector_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ)
             self.readers.add(fd)
         if events & IOLoop.WRITE:
-            self.asyncio_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
+            self.selector_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
             self.writers.add(fd)
 
     def update_handler(self, fd: Union[int, _Selectable], events: int) -> None:
         fd, fileobj = self.split_fd(fd)
         if events & IOLoop.READ:
             if fd not in self.readers:
-                self.asyncio_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ)
+                self.selector_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ)
                 self.readers.add(fd)
         else:
             if fd in self.readers:
-                self.asyncio_loop.remove_reader(fd)
+                self.selector_loop.remove_reader(fd)
                 self.readers.remove(fd)
         if events & IOLoop.WRITE:
             if fd not in self.writers:
-                self.asyncio_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
+                self.selector_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
                 self.writers.add(fd)
         else:
             if fd in self.writers:
-                self.asyncio_loop.remove_writer(fd)
+                self.selector_loop.remove_writer(fd)
                 self.writers.remove(fd)
 
     def remove_handler(self, fd: Union[int, _Selectable]) -> None:
@@ -127,10 +162,10 @@ class BaseAsyncIOLoop(IOLoop):
         if fd not in self.handlers:
             return
         if fd in self.readers:
-            self.asyncio_loop.remove_reader(fd)
+            self.selector_loop.remove_reader(fd)
             self.readers.remove(fd)
         if fd in self.writers:
-            self.asyncio_loop.remove_writer(fd)
+            self.selector_loop.remove_writer(fd)
             self.writers.remove(fd)
         del self.handlers[fd]
 
@@ -169,7 +204,7 @@ class BaseAsyncIOLoop(IOLoop):
         timeout.cancel()  # type: ignore
 
     def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
-        if get_ident() == self._thread_identity:
+        if threading.get_ident() == self._thread_identity:
             call_soon = self.asyncio_loop.call_soon
         else:
             call_soon = self.asyncio_loop.call_soon_threadsafe
@@ -182,6 +217,11 @@ class BaseAsyncIOLoop(IOLoop):
             # add_callback that completes without error will
             # eventually execute).
             pass
+        except AttributeError:
+            # ProactorEventLoop may raise this instead of RuntimeError
+            # if call_soon_threadsafe races with a call to close().
+            # Swallow it too for consistency.
+            pass
 
     def add_callback_from_signal(
         self, callback: Callable, *args: Any, **kwargs: Any
@@ -197,7 +237,7 @@ class BaseAsyncIOLoop(IOLoop):
         self,
         executor: Optional[concurrent.futures.Executor],
         func: Callable[..., _T],
-        *args: Any
+        *args: Any,
     ) -> Awaitable[_T]:
         return self.asyncio_loop.run_in_executor(executor, func, *args)
 
@@ -344,3 +384,153 @@ class AnyThreadEventLoopPolicy(_BasePolicy):  # type: ignore
             loop = self.new_event_loop()
             self.set_event_loop(loop)
             return loop
+
+
+class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
+    """Wrap an event loop to add implementations of the ``add_reader`` method family.
+
+    Instances of this class start a second thread to run a selector-based event loop.
+    This thread is completely hidden from the user; all callbacks are run on the
+    wrapped event loop's thread.
+
+    This class is used automatically by Tornado; applications should not need
+    to refer to it directly.
+
+    It is safe to wrap any event loop with this class, although it only makes sense
+    for event loops that do not implement the ``add_reader`` family of methods
+    themselves (i.e. ``WindowsProactorEventLoop``)
+
+    Closing the ``AddThreadSelectorEventLoop`` does not close the wrapped event loop.
+    """
+
+    # This class is a __getattribute__-based proxy. All attributes other than those
+    # in this set are proxied through to the underlying loop.
+    MY_ATTRIBUTES = {
+        "add_reader",
+        "add_writer",
+        "remove_reader",
+        "remove_writer",
+        "close",
+        "_real_loop",
+        "_selector_loop",
+        "_selector_thread",
+        "_run_on_selector",
+        "_handle_event_from_selector",
+        "_reader_seq",
+        "_writer_seq",
+    }
+
+    def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
+        self._real_loop = real_loop
+
+        # Sequence numbers allow us to detect races between the selector thread
+        # and the main thread, such as when a handler for a file descriptor has
+        # been removed and re-added. These maps go from file descriptor to a
+        # sequence number.
+        self._reader_seq = {}  # type: Dict[_FileDescriptorLike, int]
+        self._writer_seq = {}  # type: Dict[_FileDescriptorLike, int]
+
+        fut = (
+            concurrent.futures.Future()
+        )  # type: concurrent.futures.Future[asyncio.AbstractEventLoop]
+
+        def f() -> None:
+            loop = asyncio.SelectorEventLoop()
+            fut.set_result(loop)
+            loop.run_forever()
+            loop.close()
+
+        self._selector_thread = threading.Thread(target=f)
+        # Must be a daemon in case this event loop is not explicitly closed
+        # (often the case for the main loop).
+        self._selector_thread.daemon = True
+        self._selector_thread.start()
+        self._selector_loop = fut.result()
+
+    def close(self) -> None:
+        self._selector_loop.call_soon_threadsafe(self._selector_loop.stop)
+        if not _atexit_run:
+            # Shutdown is tricky: Our thread must be set as a daemon so that it
+            # doesn't prevent shutdown in the common case of an unclosed main
+            # loop. But daemon threads are halted relatively early in the
+            # interpreter shutdown process; once this happens attempts to join
+            # them will block forever.
+            #
+            # I can't find formal documentation of this, but as of cpython 3.8
+            # the shutdown order is
+            # 1. atexit functions
+            # 2. daemon threads halt
+            # 3. global destructors run
+            #
+            # If we're running after atexit functions, we're probably in a
+            # global destructor. But in any case, we know that the process is
+            # about to exit and it's no longer necessary to join our daemon
+            # thread. (Is it ever necessary to join it? Probably not but it
+            # feels dirty not to)
+            self._selector_thread.join()
+        self._real_loop.close()
+
+    def __getattribute__(self, name: str) -> Any:
+        if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
+            return super().__getattribute__(name)
+        return getattr(self._real_loop, name)
+
+    def _run_on_selector(self, method: Callable[..., _T], *args: Any) -> _T:
+        """Synchronously run the given method on the selector thread.
+        """
+        fut = concurrent.futures.Future()  # type: concurrent.futures.Future[_T]
+
+        def wrapper() -> None:
+            try:
+                result = method(*args)
+            except Exception as e:
+                fut.set_exception(e)
+            else:
+                fut.set_result(result)
+
+        self._selector_loop.call_soon_threadsafe(wrapper)
+        return fut.result()
+
+    def add_reader(
+        self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
+    ) -> None:
+        seq = next(_seq_gen)
+        self._reader_seq[fd] = seq
+
+        def wrapper() -> None:
+            if self._reader_seq.get(fd, None) != seq:
+                return
+            callback(*args)
+
+        return self._run_on_selector(
+            self._selector_loop.add_reader,
+            fd,
+            self._real_loop.call_soon_threadsafe,
+            wrapper,
+        )
+
+    def add_writer(
+        self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
+    ) -> None:
+        seq = next(_seq_gen)
+        self._writer_seq[fd] = seq
+
+        def wrapper() -> None:
+            if self._writer_seq.get(fd, None) != seq:
+                return
+            callback(*args)
+
+        return self._run_on_selector(
+            self._selector_loop.add_writer,
+            fd,
+            self._real_loop.call_soon_threadsafe,
+            wrapper,
+        )
+
+    def remove_reader(self, fd: _FileDescriptorLike) -> None:
+        del self._reader_seq[fd]
+        return self._run_on_selector(self._selector_loop.remove_reader, fd)
+
+    def remove_writer(self, fd: _FileDescriptorLike) -> None:
+        del self._writer_seq[fd]
+        return self._run_on_selector(self._selector_loop.remove_writer, fd)
diff --git a/tornado/test/__init__.py b/tornado/test/__init__.py
deleted file mode 100644 (file)
index bfc2656..0000000
+++ /dev/null
@@ -1,8 +0,0 @@
-import asyncio
-import sys
-
-# Use the selector event loop on windows. Do this in tornado/test/__init__.py
-# instead of runtests.py so it happens no matter how the test is run (such as
-# through editor integrations).
-if sys.platform == "win32" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
-    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())  # type: ignore
index 2fc3f8e30e0e5bdfce5bf7f4524b76300952b22b..661953d73e9431bae80c60795e602e5dd98234f4 100644 (file)
@@ -75,6 +75,13 @@ def restore_signal_handlers(saved):
 class CompatibilityTests(unittest.TestCase):
     def setUp(self):
         self.saved_signals = save_signal_handlers()
+        self.saved_policy = asyncio.get_event_loop_policy()
+        if hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
+            # Twisted requires a selector event loop, even if Tornado is
+            # doing its own tricks in AsyncIOLoop to support proactors.
+            # Setting an AddThreadSelectorEventLoop exposes various edge
+            # cases so just use a regular selector.
+            asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())  # type: ignore
         self.io_loop = IOLoop()
         self.io_loop.make_current()
         self.reactor = AsyncioSelectorReactor()
@@ -83,6 +90,7 @@ class CompatibilityTests(unittest.TestCase):
         self.reactor.disconnectAll()
         self.io_loop.clear_current()
         self.io_loop.close(all_fds=True)
+        asyncio.set_event_loop_policy(self.saved_policy)
         restore_signal_handlers(self.saved_signals)
 
     def start_twisted_server(self):