]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
asyncio: Rework AddThreadSelectorEventLoop
authorBen Darnell <ben@cockroachlabs.com>
Mon, 6 Jul 2020 00:48:16 +0000 (20:48 -0400)
committerBen Darnell <ben@bendarnell.com>
Wed, 2 Sep 2020 15:10:13 +0000 (11:10 -0400)
Running a whole event loop on the other thread leads to tricky
synchronization problems. Instead, keep as much as possible on the
main thread, and call out to a second thread only for the blocking
select system call itself.

tornado/platform/asyncio.py
tornado/test/ioloop_test.py
tornado/test/iostream_test.py

index 1437581248da565438df66fe73b42269ae16350b..ccbf4caf80ddf65f116e97fea04ba40cb61473d1 100644 (file)
@@ -22,40 +22,51 @@ the same event loop.
 import asyncio
 import atexit
 import concurrent.futures
+import errno
 import functools
-import itertools
+import select
+import socket
 import sys
 import threading
 import typing
 from tornado.gen import convert_yielded
 from tornado.ioloop import IOLoop, _Selectable
 
-from typing import Any, TypeVar, Awaitable, Callable, Union, Optional
+from typing import Any, TypeVar, Awaitable, Callable, Union, Optional, List, Tuple, Dict
 
 if typing.TYPE_CHECKING:
-    from typing import Set, Dict, Tuple  # noqa: F401
-
-_T = TypeVar("_T")
-
-
-class _HasFileno(typing.Protocol):
-    def fileno(self) -> int:
-        pass
+    from typing import Set  # noqa: F401
+    from typing_extensions import Protocol
 
+    class _HasFileno(Protocol):
+        def fileno(self) -> int:
+            pass
 
-_FileDescriptorLike = Union[int, _HasFileno]
+    _FileDescriptorLike = Union[int, _HasFileno]
 
+_T = TypeVar("_T")
 
-_seq_gen = itertools.count()
 
-_atexit_run = False
+# Collection of sockets to write to at shutdown to wake up any selector threads.
+_waker_sockets = set()  # type: Set[socket.socket]
 
 
 def _atexit_callback() -> None:
-    global _atexit_run
-    _atexit_run = True
+    for fd in _waker_sockets:
+        try:
+            fd.send(b"a")
+        except BlockingIOError:
+            pass
 
 
+# atexit callbacks are run in LIFO order. Our callback must run before
+# ThreadPoolExecutor's or it will deadlock (the pool's threads can't
+# finish their work items until we write to their waker sockets). In
+# recent versions of Python the thread pool atexit callback is
+# registered in a getattr hook the first time TPE is *referenced*
+# (instead of older versions of python where it was registered when
+# concurrent.futures was imported).
+concurrent.futures.ThreadPoolExecutor
 atexit.register(_atexit_callback)
 
 
@@ -71,7 +82,7 @@ class BaseAsyncIOLoop(IOLoop):
         # as windows where the default event loop does not implement these methods.
         self.selector_loop = asyncio_loop
         if hasattr(asyncio, "ProactorEventLoop") and isinstance(
-            asyncio_loop, asyncio.ProactorEventLoop
+            asyncio_loop, asyncio.ProactorEventLoop  # type: ignore
         ):
             # Ignore this line for mypy because the abstract method checker
             # doesn't understand dynamic proxies.
@@ -237,7 +248,7 @@ class BaseAsyncIOLoop(IOLoop):
         self,
         executor: Optional[concurrent.futures.Executor],
         func: Callable[..., _T],
-        *args: Any,
+        *args: Any
     ) -> Awaitable[_T]:
         return self.asyncio_loop.run_in_executor(executor, func, *args)
 
@@ -389,9 +400,9 @@ class AnyThreadEventLoopPolicy(_BasePolicy):  # type: ignore
 class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
     """Wrap an event loop to add implementations of the ``add_reader`` method family.
 
-    Instances of this class start a second thread to run a selector-based event loop.
-    This thread is completely hidden from the user; all callbacks are run on the
-    wrapped event loop's thread.
+    Instances of this class start a second thread to run a selector.
+    This thread is completely hidden from the user; all callbacks are
+    run on the wrapped event loop's thread.
 
     This class is used automatically by Tornado; applications should not need
     to refer to it directly.
@@ -400,137 +411,182 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
     for event loops that do not implement the ``add_reader`` family of methods
     themselves (i.e. ``WindowsProactorEventLoop``)
 
-    Closing the ``AddThreadSelectorEventLoop`` does not close the wrapped event loop.
+    Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop.
+
     """
 
     # This class is a __getattribute__-based proxy. All attributes other than those
     # in this set are proxied through to the underlying loop.
     MY_ATTRIBUTES = {
+        "_consume_waker",
+        "_executor",
+        "_handle_event",
+        "_readers",
+        "_real_loop",
+        "_run_select",
+        "_select_loop",
+        "_selector_task",
+        "_start_selector",
+        "_wake_selector",
+        "_waker_r",
+        "_waker_w",
+        "_writers",
         "add_reader",
         "add_writer",
+        "close",
         "remove_reader",
         "remove_writer",
-        "close",
-        "_real_loop",
-        "_selector_loop",
-        "_selector_thread",
-        "_run_on_selector",
-        "_handle_event_from_selector",
-        "_reader_seq",
-        "_writer_seq",
     }
 
+    def __getattribute__(self, name: str) -> Any:
+        if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
+            return super().__getattribute__(name)
+        return getattr(self._real_loop, name)
+
     def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
         self._real_loop = real_loop
-
-        # Sequence numbers allow us to detect races between the selector thread
-        # and the main thread, such as when a handler for a file descriptor has
-        # been removed and re-added. These maps go from file descriptor to a
-        # sequence number.
-        self._reader_seq = {}  # type: Dict[_FileDescriptorLike, int]
-        self._writer_seq = {}  # type: Dict[_FileDescriptorLike, int]
-
-        fut = (
-            concurrent.futures.Future()
-        )  # type: concurrent.futures.Future[asyncio.AbstractEventLoop]
-
-        def f() -> None:
-            loop = asyncio.SelectorEventLoop()
-            fut.set_result(loop)
-            loop.run_forever()
-            loop.close()
-
-        self._selector_thread = threading.Thread(target=f)
-        # Must be a daemon in case this event loop is not explicitly closed
-        # (often the case for the main loop).
-        self._selector_thread.daemon = True
-        self._selector_thread.start()
-        self._selector_loop = fut.result()
+        # Create our own executor to ensure we always have a thread
+        # available (we'll keep it 100% busy) instead of contending
+        # with the application for a thread in the default executor.
+        self._executor = concurrent.futures.ThreadPoolExecutor(1)
+        self._selector_task = None
+        # Start the select loop once the loop is started.
+        self._real_loop.call_soon(self._start_selector)
+
+        self._readers = {}  # type: Dict[_FileDescriptorLike, Callable]
+        self._writers = {}  # type: Dict[_FileDescriptorLike, Callable]
+
+        # Writing to _waker_w will wake up the selector thread, which
+        # watches for _waker_r to be readable.
+        self._waker_r, self._waker_w = socket.socketpair()
+        self._waker_r.setblocking(False)
+        self._waker_w.setblocking(False)
+        _waker_sockets.add(self._waker_w)
+        self.add_reader(self._waker_r, self._consume_waker)
+
+    def __del__(self) -> None:
+        # If the top-level application code uses asyncio interfaces to
+        # start and stop the event loop, no objects created in Tornado
+        # can get a clean shutdown notification. If we're just left to
+        # be GC'd, we must explicitly close our sockets to avoid
+        # logging warnings.
+        _waker_sockets.discard(self._waker_w)
+        self._waker_r.close()
+        self._waker_w.close()
 
     def close(self) -> None:
-        self._selector_loop.call_soon_threadsafe(self._selector_loop.stop)
-        if not _atexit_run:
-            # Shutdown is tricky: Our thread must be set as a daemon so that it
-            # doesn't prevent shutdown in the common case of an unclosed main
-            # loop. But daemon threads are halted relatively early in the
-            # interpreter shutdown process; once this happens attempts to join
-            # them will block forever.
-            #
-            # I can't find formal documentation of this, but as of cpython 3.8
-            # the shutdown order is
-            # 1. atexit functions
-            # 2. daemon threads halt
-            # 3. global destructors run
-            #
-            # If we're running after atexit functions, we're probably in a
-            # global destructor. But in any case, we know that the process is
-            # about to exit and it's no longer necessary to join our daemon
-            # thread. (Is it ever necessary to join it? Probably not but it
-            # feels dirty not to)
-            self._selector_thread.join()
+        if self._selector_task is not None:
+            self._selector_task.cancel()
+            try:
+                # Cancellation is not immediate (coroutines are given
+                # a chance to catch the error and recover) so we must
+                # restart the loop here to allow our selector task to
+                # finish and avoid logging warnings at shutdown.
+                self._real_loop.run_until_complete(self._selector_task)
+            except asyncio.CancelledError:
+                pass
+        self._wake_selector()
+        self._executor.shutdown()
+        _waker_sockets.discard(self._waker_w)
+        self._waker_r.close()
+        self._waker_w.close()
         self._real_loop.close()
 
-    def __getattribute__(self, name: str) -> Any:
-        if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
-            return super().__getattribute__(name)
-        return getattr(self._real_loop, name)
+    def _wake_selector(self) -> None:
+        try:
+            self._waker_w.send(b"a")
+        except BlockingIOError:
+            pass
 
-    def _run_on_selector(self, method: Callable[..., _T], *args: Any) -> _T:
-        """Synchronously run the given method on the selector thread.
-        """
-        fut = concurrent.futures.Future()  # type: concurrent.futures.Future[_T]
+    def _consume_waker(self) -> None:
+        try:
+            self._waker_r.recv(1024)
+        except BlockingIOError:
+            pass
 
-        def wrapper() -> None:
-            try:
-                result = method(*args)
-            except Exception as e:
-                fut.set_exception(e)
-            else:
-                fut.set_result(result)
+    def _run_select(
+        self, to_read: List[int], to_write: List[int]
+    ) -> Tuple[List[int], List[int]]:
+        # We use the simpler interface of the select module instead of
+        # the more stateful interface in the selectors module because
+        # this class is only intended for use on windows, where
+        # select.select is the only option. The selector interface
+        # does not have well-documented thread-safety semantics that
+        # we can rely on so ensuring proper synchronization would be
+        # tricky.
+        try:
+            # On windows, selecting on a socket for write will not
+            # return the socket when there is an error (but selecting
+            # for reads works). Also select for errors when selecting
+            # for writes, and merge the results.
+            #
+            # This pattern is also used in
+            # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
+            rs, ws, xs = select.select(to_read, to_write, to_write)
+            ws = ws + xs
+        except OSError as e:
+            # After remove_reader or remove_writer is called, the file
+            # descriptor may subsequently be closed on the event loop
+            # thread. It's possible that this select thread hasn't
+            # gotten into the select system call by the time that
+            # happens in which case (at least on macOS), select may
+            # raise a "bad file descriptor" error. If we get that
+            # error, check and see if we're also being woken up by
+            # polling the waker alone. If we are, just return to the
+            # event loop and we'll get the updated set of file
+            # descriptors on the next iteration. Otherwise, raise the
+            # original error.
+            if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
+                rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
+                if rs:
+                    return rs, []
+            raise
+        return rs, ws
+
+    def _start_selector(self) -> None:
+        self._selector_task = asyncio.create_task(self._select_loop())  # type: ignore
+
+    async def _select_loop(self) -> None:
+        while True:
+            # Capture reader and writer sets here in the event loop
+            # thread to avoid any problems with concurrent
+            # modification while the select loop uses them.
+            rs, ws = await self.run_in_executor(
+                self._executor,
+                self._run_select,
+                list(self._readers.keys()),
+                list(self._writers.keys()),
+            )
+            for r in rs:
+                self._handle_event(r, self._readers)
+            for w in ws:
+                self._handle_event(w, self._writers)
 
-        self._selector_loop.call_soon_threadsafe(wrapper)
-        return fut.result()
+    def _handle_event(
+        self, fd: "_FileDescriptorLike", cb_map: Dict["_FileDescriptorLike", Callable],
+    ) -> None:
+        try:
+            callback = cb_map[fd]
+        except KeyError:
+            return
+        callback()
 
     def add_reader(
-        self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
+        self, fd: "_FileDescriptorLike", callback: Callable[..., None], *args: Any
     ) -> None:
-        seq = next(_seq_gen)
-        self._reader_seq[fd] = seq
-
-        def wrapper() -> None:
-            if self._reader_seq.get(fd, None) != seq:
-                return
-            callback(*args)
-
-        return self._run_on_selector(
-            self._selector_loop.add_reader,
-            fd,
-            self._real_loop.call_soon_threadsafe,
-            wrapper,
-        )
+        self._readers[fd] = functools.partial(callback, *args)
+        self._wake_selector()
 
     def add_writer(
-        self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
+        self, fd: "_FileDescriptorLike", callback: Callable[..., None], *args: Any
     ) -> None:
-        seq = next(_seq_gen)
-        self._writer_seq[fd] = seq
-
-        def wrapper() -> None:
-            if self._writer_seq.get(fd, None) != seq:
-                return
-            callback(*args)
-
-        return self._run_on_selector(
-            self._selector_loop.add_writer,
-            fd,
-            self._real_loop.call_soon_threadsafe,
-            wrapper,
-        )
+        self._writers[fd] = functools.partial(callback, *args)
+        self._wake_selector()
 
-    def remove_reader(self, fd: _FileDescriptorLike) -> None:
-        del self._reader_seq[fd]
-        return self._run_on_selector(self._selector_loop.remove_reader, fd)
+    def remove_reader(self, fd: "_FileDescriptorLike") -> None:
+        del self._readers[fd]
+        self._wake_selector()
 
-    def remove_writer(self, fd: _FileDescriptorLike) -> None:
-        del self._writer_seq[fd]
-        return self._run_on_selector(self._selector_loop.remove_writer, fd)
+    def remove_writer(self, fd: "_FileDescriptorLike") -> None:
+        del self._writers[fd]
+        self._wake_selector()
index fa3f75186273511496c3ed0d16582e5a24309e58..978941230dba34f4d03b21aafef61bc4a1285cc1 100644 (file)
@@ -400,9 +400,14 @@ class TestIOLoop(AsyncTestCase):
             client.close()
             server.close()
 
+    @skipIfNonUnix
     @gen_test
     def test_init_close_race(self):
         # Regression test for #2367
+        #
+        # Skipped on windows because of what looks like a bug in the
+        # proactor event loop when started and stopped on non-main
+        # threads.
         def f():
             for i in range(10):
                 loop = IOLoop()
index ecab4cc9e9540304f77521b7a3021a05c8cb094c..67d6438cd76127592cf86a022f87db2fc1e224b7 100644 (file)
@@ -1,6 +1,7 @@
 from tornado.concurrent import Future
 from tornado import gen
 from tornado import netutil
+from tornado.ioloop import IOLoop
 from tornado.iostream import (
     IOStream,
     SSLIOStream,
@@ -12,6 +13,7 @@ from tornado.httputil import HTTPHeaders
 from tornado.locks import Condition, Event
 from tornado.log import gen_log
 from tornado.netutil import ssl_wrap_socket
+from tornado.platform.asyncio import AddThreadSelectorEventLoop
 from tornado.tcpserver import TCPServer
 from tornado.testing import (
     AsyncHTTPTestCase,
@@ -246,11 +248,11 @@ class TestReadWriteMixin(object):
         rs, ws = yield self.make_iostream_pair()
         try:
             ws.write(b"1234")
-            ws.close()
             # Read one byte to make sure the client has received the data.
             # It won't run the close callback as long as there is more buffered
             # data that could satisfy a later read.
             data = yield rs.read_bytes(1)
+            ws.close()
             self.assertEqual(data, b"1")
             data = yield rs.read_until_close()
             self.assertEqual(data, b"234")
@@ -298,6 +300,10 @@ class TestReadWriteMixin(object):
         # in size
         async with self.iostream_pair() as (rs, ws):
             rf = asyncio.ensure_future(rs.read_until(b"done"))
+            # We need to wait for the read_until to actually start. On
+            # windows that's tricky because the selector runs in
+            # another thread; sleeping is the simplest way.
+            await asyncio.sleep(0.1)
             await ws.write(b"x" * 2048)
             ws.write(b"done")
             ws.close()
@@ -796,6 +802,17 @@ class TestIOStreamMixin(TestReadWriteMixin):
         # on socket FDs, but we can't close the socket object normally
         # because we won't get the error we want if the socket knows
         # it's closed.
+        #
+        # This test is also disabled when the
+        # AddThreadSelectorEventLoop is used, because a race between
+        # this thread closing the socket and the selector thread
+        # calling the select system call can make this test flaky.
+        # This event loop implementation is normally only used on
+        # windows, making this check redundant with skipIfNonUnix, but
+        # we sometimes enable it on other platforms for testing.
+        io_loop = IOLoop.current()
+        if isinstance(io_loop.selector_loop, AddThreadSelectorEventLoop):
+            self.skipTest("AddThreadSelectorEventLoop not supported")
         server, client = yield self.make_iostream_pair()
         try:
             os.close(server.socket.fileno())