From: Ben Darnell Date: Mon, 6 Jul 2020 00:48:16 +0000 (-0400) Subject: asyncio: Rework AddThreadSelectorEventLoop X-Git-Tag: v6.1.0b1~9^2~3 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=5cfe2fc2223fc2b3f6f26d5e6d59167d9b46fdc5;p=thirdparty%2Ftornado.git asyncio: Rework AddThreadSelectorEventLoop Running a whole event loop on the other thread leads to tricky synchronization problems. Instead, keep as much as possible on the main thread, and call out to a second thread only for the blocking select system call itself. --- diff --git a/tornado/platform/asyncio.py b/tornado/platform/asyncio.py index 143758124..ccbf4caf8 100644 --- a/tornado/platform/asyncio.py +++ b/tornado/platform/asyncio.py @@ -22,40 +22,51 @@ the same event loop. import asyncio import atexit import concurrent.futures +import errno import functools -import itertools +import select +import socket import sys import threading import typing from tornado.gen import convert_yielded from tornado.ioloop import IOLoop, _Selectable -from typing import Any, TypeVar, Awaitable, Callable, Union, Optional +from typing import Any, TypeVar, Awaitable, Callable, Union, Optional, List, Tuple, Dict if typing.TYPE_CHECKING: - from typing import Set, Dict, Tuple # noqa: F401 - -_T = TypeVar("_T") - - -class _HasFileno(typing.Protocol): - def fileno(self) -> int: - pass + from typing import Set # noqa: F401 + from typing_extensions import Protocol + class _HasFileno(Protocol): + def fileno(self) -> int: + pass -_FileDescriptorLike = Union[int, _HasFileno] + _FileDescriptorLike = Union[int, _HasFileno] +_T = TypeVar("_T") -_seq_gen = itertools.count() -_atexit_run = False +# Collection of sockets to write to at shutdown to wake up any selector threads. +_waker_sockets = set() # type: Set[socket.socket] def _atexit_callback() -> None: - global _atexit_run - _atexit_run = True + for fd in _waker_sockets: + try: + fd.send(b"a") + except BlockingIOError: + pass +# atexit callbacks are run in LIFO order. Our callback must run before +# ThreadPoolExecutor's or it will deadlock (the pool's threads can't +# finish their work items until we write to their waker sockets). In +# recent versions of Python the thread pool atexit callback is +# registered in a getattr hook the first time TPE is *referenced* +# (instead of older versions of python where it was registered when +# concurrent.futures was imported). +concurrent.futures.ThreadPoolExecutor atexit.register(_atexit_callback) @@ -71,7 +82,7 @@ class BaseAsyncIOLoop(IOLoop): # as windows where the default event loop does not implement these methods. self.selector_loop = asyncio_loop if hasattr(asyncio, "ProactorEventLoop") and isinstance( - asyncio_loop, asyncio.ProactorEventLoop + asyncio_loop, asyncio.ProactorEventLoop # type: ignore ): # Ignore this line for mypy because the abstract method checker # doesn't understand dynamic proxies. @@ -237,7 +248,7 @@ class BaseAsyncIOLoop(IOLoop): self, executor: Optional[concurrent.futures.Executor], func: Callable[..., _T], - *args: Any, + *args: Any ) -> Awaitable[_T]: return self.asyncio_loop.run_in_executor(executor, func, *args) @@ -389,9 +400,9 @@ class AnyThreadEventLoopPolicy(_BasePolicy): # type: ignore class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): """Wrap an event loop to add implementations of the ``add_reader`` method family. - Instances of this class start a second thread to run a selector-based event loop. - This thread is completely hidden from the user; all callbacks are run on the - wrapped event loop's thread. + Instances of this class start a second thread to run a selector. + This thread is completely hidden from the user; all callbacks are + run on the wrapped event loop's thread. This class is used automatically by Tornado; applications should not need to refer to it directly. @@ -400,137 +411,182 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): for event loops that do not implement the ``add_reader`` family of methods themselves (i.e. ``WindowsProactorEventLoop``) - Closing the ``AddThreadSelectorEventLoop`` does not close the wrapped event loop. + Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop. + """ # This class is a __getattribute__-based proxy. All attributes other than those # in this set are proxied through to the underlying loop. MY_ATTRIBUTES = { + "_consume_waker", + "_executor", + "_handle_event", + "_readers", + "_real_loop", + "_run_select", + "_select_loop", + "_selector_task", + "_start_selector", + "_wake_selector", + "_waker_r", + "_waker_w", + "_writers", "add_reader", "add_writer", + "close", "remove_reader", "remove_writer", - "close", - "_real_loop", - "_selector_loop", - "_selector_thread", - "_run_on_selector", - "_handle_event_from_selector", - "_reader_seq", - "_writer_seq", } + def __getattribute__(self, name: str) -> Any: + if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES: + return super().__getattribute__(name) + return getattr(self._real_loop, name) + def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None: self._real_loop = real_loop - - # Sequence numbers allow us to detect races between the selector thread - # and the main thread, such as when a handler for a file descriptor has - # been removed and re-added. These maps go from file descriptor to a - # sequence number. - self._reader_seq = {} # type: Dict[_FileDescriptorLike, int] - self._writer_seq = {} # type: Dict[_FileDescriptorLike, int] - - fut = ( - concurrent.futures.Future() - ) # type: concurrent.futures.Future[asyncio.AbstractEventLoop] - - def f() -> None: - loop = asyncio.SelectorEventLoop() - fut.set_result(loop) - loop.run_forever() - loop.close() - - self._selector_thread = threading.Thread(target=f) - # Must be a daemon in case this event loop is not explicitly closed - # (often the case for the main loop). - self._selector_thread.daemon = True - self._selector_thread.start() - self._selector_loop = fut.result() + # Create our own executor to ensure we always have a thread + # available (we'll keep it 100% busy) instead of contending + # with the application for a thread in the default executor. + self._executor = concurrent.futures.ThreadPoolExecutor(1) + self._selector_task = None + # Start the select loop once the loop is started. + self._real_loop.call_soon(self._start_selector) + + self._readers = {} # type: Dict[_FileDescriptorLike, Callable] + self._writers = {} # type: Dict[_FileDescriptorLike, Callable] + + # Writing to _waker_w will wake up the selector thread, which + # watches for _waker_r to be readable. + self._waker_r, self._waker_w = socket.socketpair() + self._waker_r.setblocking(False) + self._waker_w.setblocking(False) + _waker_sockets.add(self._waker_w) + self.add_reader(self._waker_r, self._consume_waker) + + def __del__(self) -> None: + # If the top-level application code uses asyncio interfaces to + # start and stop the event loop, no objects created in Tornado + # can get a clean shutdown notification. If we're just left to + # be GC'd, we must explicitly close our sockets to avoid + # logging warnings. + _waker_sockets.discard(self._waker_w) + self._waker_r.close() + self._waker_w.close() def close(self) -> None: - self._selector_loop.call_soon_threadsafe(self._selector_loop.stop) - if not _atexit_run: - # Shutdown is tricky: Our thread must be set as a daemon so that it - # doesn't prevent shutdown in the common case of an unclosed main - # loop. But daemon threads are halted relatively early in the - # interpreter shutdown process; once this happens attempts to join - # them will block forever. - # - # I can't find formal documentation of this, but as of cpython 3.8 - # the shutdown order is - # 1. atexit functions - # 2. daemon threads halt - # 3. global destructors run - # - # If we're running after atexit functions, we're probably in a - # global destructor. But in any case, we know that the process is - # about to exit and it's no longer necessary to join our daemon - # thread. (Is it ever necessary to join it? Probably not but it - # feels dirty not to) - self._selector_thread.join() + if self._selector_task is not None: + self._selector_task.cancel() + try: + # Cancellation is not immediate (coroutines are given + # a chance to catch the error and recover) so we must + # restart the loop here to allow our selector task to + # finish and avoid logging warnings at shutdown. + self._real_loop.run_until_complete(self._selector_task) + except asyncio.CancelledError: + pass + self._wake_selector() + self._executor.shutdown() + _waker_sockets.discard(self._waker_w) + self._waker_r.close() + self._waker_w.close() self._real_loop.close() - def __getattribute__(self, name: str) -> Any: - if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES: - return super().__getattribute__(name) - return getattr(self._real_loop, name) + def _wake_selector(self) -> None: + try: + self._waker_w.send(b"a") + except BlockingIOError: + pass - def _run_on_selector(self, method: Callable[..., _T], *args: Any) -> _T: - """Synchronously run the given method on the selector thread. - """ - fut = concurrent.futures.Future() # type: concurrent.futures.Future[_T] + def _consume_waker(self) -> None: + try: + self._waker_r.recv(1024) + except BlockingIOError: + pass - def wrapper() -> None: - try: - result = method(*args) - except Exception as e: - fut.set_exception(e) - else: - fut.set_result(result) + def _run_select( + self, to_read: List[int], to_write: List[int] + ) -> Tuple[List[int], List[int]]: + # We use the simpler interface of the select module instead of + # the more stateful interface in the selectors module because + # this class is only intended for use on windows, where + # select.select is the only option. The selector interface + # does not have well-documented thread-safety semantics that + # we can rely on so ensuring proper synchronization would be + # tricky. + try: + # On windows, selecting on a socket for write will not + # return the socket when there is an error (but selecting + # for reads works). Also select for errors when selecting + # for writes, and merge the results. + # + # This pattern is also used in + # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317 + rs, ws, xs = select.select(to_read, to_write, to_write) + ws = ws + xs + except OSError as e: + # After remove_reader or remove_writer is called, the file + # descriptor may subsequently be closed on the event loop + # thread. It's possible that this select thread hasn't + # gotten into the select system call by the time that + # happens in which case (at least on macOS), select may + # raise a "bad file descriptor" error. If we get that + # error, check and see if we're also being woken up by + # polling the waker alone. If we are, just return to the + # event loop and we'll get the updated set of file + # descriptors on the next iteration. Otherwise, raise the + # original error. + if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF): + rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0) + if rs: + return rs, [] + raise + return rs, ws + + def _start_selector(self) -> None: + self._selector_task = asyncio.create_task(self._select_loop()) # type: ignore + + async def _select_loop(self) -> None: + while True: + # Capture reader and writer sets here in the event loop + # thread to avoid any problems with concurrent + # modification while the select loop uses them. + rs, ws = await self.run_in_executor( + self._executor, + self._run_select, + list(self._readers.keys()), + list(self._writers.keys()), + ) + for r in rs: + self._handle_event(r, self._readers) + for w in ws: + self._handle_event(w, self._writers) - self._selector_loop.call_soon_threadsafe(wrapper) - return fut.result() + def _handle_event( + self, fd: "_FileDescriptorLike", cb_map: Dict["_FileDescriptorLike", Callable], + ) -> None: + try: + callback = cb_map[fd] + except KeyError: + return + callback() def add_reader( - self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any + self, fd: "_FileDescriptorLike", callback: Callable[..., None], *args: Any ) -> None: - seq = next(_seq_gen) - self._reader_seq[fd] = seq - - def wrapper() -> None: - if self._reader_seq.get(fd, None) != seq: - return - callback(*args) - - return self._run_on_selector( - self._selector_loop.add_reader, - fd, - self._real_loop.call_soon_threadsafe, - wrapper, - ) + self._readers[fd] = functools.partial(callback, *args) + self._wake_selector() def add_writer( - self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any + self, fd: "_FileDescriptorLike", callback: Callable[..., None], *args: Any ) -> None: - seq = next(_seq_gen) - self._writer_seq[fd] = seq - - def wrapper() -> None: - if self._writer_seq.get(fd, None) != seq: - return - callback(*args) - - return self._run_on_selector( - self._selector_loop.add_writer, - fd, - self._real_loop.call_soon_threadsafe, - wrapper, - ) + self._writers[fd] = functools.partial(callback, *args) + self._wake_selector() - def remove_reader(self, fd: _FileDescriptorLike) -> None: - del self._reader_seq[fd] - return self._run_on_selector(self._selector_loop.remove_reader, fd) + def remove_reader(self, fd: "_FileDescriptorLike") -> None: + del self._readers[fd] + self._wake_selector() - def remove_writer(self, fd: _FileDescriptorLike) -> None: - del self._writer_seq[fd] - return self._run_on_selector(self._selector_loop.remove_writer, fd) + def remove_writer(self, fd: "_FileDescriptorLike") -> None: + del self._writers[fd] + self._wake_selector() diff --git a/tornado/test/ioloop_test.py b/tornado/test/ioloop_test.py index fa3f75186..978941230 100644 --- a/tornado/test/ioloop_test.py +++ b/tornado/test/ioloop_test.py @@ -400,9 +400,14 @@ class TestIOLoop(AsyncTestCase): client.close() server.close() + @skipIfNonUnix @gen_test def test_init_close_race(self): # Regression test for #2367 + # + # Skipped on windows because of what looks like a bug in the + # proactor event loop when started and stopped on non-main + # threads. def f(): for i in range(10): loop = IOLoop() diff --git a/tornado/test/iostream_test.py b/tornado/test/iostream_test.py index ecab4cc9e..67d6438cd 100644 --- a/tornado/test/iostream_test.py +++ b/tornado/test/iostream_test.py @@ -1,6 +1,7 @@ from tornado.concurrent import Future from tornado import gen from tornado import netutil +from tornado.ioloop import IOLoop from tornado.iostream import ( IOStream, SSLIOStream, @@ -12,6 +13,7 @@ from tornado.httputil import HTTPHeaders from tornado.locks import Condition, Event from tornado.log import gen_log from tornado.netutil import ssl_wrap_socket +from tornado.platform.asyncio import AddThreadSelectorEventLoop from tornado.tcpserver import TCPServer from tornado.testing import ( AsyncHTTPTestCase, @@ -246,11 +248,11 @@ class TestReadWriteMixin(object): rs, ws = yield self.make_iostream_pair() try: ws.write(b"1234") - ws.close() # Read one byte to make sure the client has received the data. # It won't run the close callback as long as there is more buffered # data that could satisfy a later read. data = yield rs.read_bytes(1) + ws.close() self.assertEqual(data, b"1") data = yield rs.read_until_close() self.assertEqual(data, b"234") @@ -298,6 +300,10 @@ class TestReadWriteMixin(object): # in size async with self.iostream_pair() as (rs, ws): rf = asyncio.ensure_future(rs.read_until(b"done")) + # We need to wait for the read_until to actually start. On + # windows that's tricky because the selector runs in + # another thread; sleeping is the simplest way. + await asyncio.sleep(0.1) await ws.write(b"x" * 2048) ws.write(b"done") ws.close() @@ -796,6 +802,17 @@ class TestIOStreamMixin(TestReadWriteMixin): # on socket FDs, but we can't close the socket object normally # because we won't get the error we want if the socket knows # it's closed. + # + # This test is also disabled when the + # AddThreadSelectorEventLoop is used, because a race between + # this thread closing the socket and the selector thread + # calling the select system call can make this test flaky. + # This event loop implementation is normally only used on + # windows, making this check redundant with skipIfNonUnix, but + # we sometimes enable it on other platforms for testing. + io_loop = IOLoop.current() + if isinstance(io_loop.selector_loop, AddThreadSelectorEventLoop): + self.skipTest("AddThreadSelectorEventLoop not supported") server, client = yield self.make_iostream_pair() try: os.close(server.socket.fileno())