From: Ben Darnell Date: Tue, 1 Sep 2020 20:48:47 +0000 (-0400) Subject: asyncio: Refactor selector to callbacks instead of coroutine X-Git-Tag: v6.1.0b1~9^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b03acc54c468c0cf9a3c139b3e59e8627fd7e64f;p=thirdparty%2Ftornado.git asyncio: Refactor selector to callbacks instead of coroutine Restarting the event loop to "cleanly" shut down a coroutine introduces other problems (mainly manifesting as errors logged while running tornado.test.gen_test). Replace the coroutine with a pair of callbacks so we don't need to do anything special to shut down without logging warnings. --- diff --git a/tornado/platform/asyncio.py b/tornado/platform/asyncio.py index ccbf4caf8..390d5259b 100644 --- a/tornado/platform/asyncio.py +++ b/tornado/platform/asyncio.py @@ -423,10 +423,9 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): "_handle_event", "_readers", "_real_loop", + "_start_select", "_run_select", - "_select_loop", - "_selector_task", - "_start_selector", + "_handle_select", "_wake_selector", "_waker_r", "_waker_w", @@ -449,9 +448,8 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): # available (we'll keep it 100% busy) instead of contending # with the application for a thread in the default executor. self._executor = concurrent.futures.ThreadPoolExecutor(1) - self._selector_task = None # Start the select loop once the loop is started. - self._real_loop.call_soon(self._start_selector) + self._real_loop.call_soon(self._start_select) self._readers = {} # type: Dict[_FileDescriptorLike, Callable] self._writers = {} # type: Dict[_FileDescriptorLike, Callable] @@ -475,16 +473,6 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): self._waker_w.close() def close(self) -> None: - if self._selector_task is not None: - self._selector_task.cancel() - try: - # Cancellation is not immediate (coroutines are given - # a chance to catch the error and recover) so we must - # restart the loop here to allow our selector task to - # finish and avoid logging warnings at shutdown. - self._real_loop.run_until_complete(self._selector_task) - except asyncio.CancelledError: - pass self._wake_selector() self._executor.shutdown() _waker_sockets.discard(self._waker_w) @@ -504,6 +492,18 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): except BlockingIOError: pass + def _start_select(self) -> None: + # Capture reader and writer sets here in the event loop + # thread to avoid any problems with concurrent + # modification while the select loop uses them. + f = self.run_in_executor( + self._executor, + self._run_select, + list(self._readers.keys()), + list(self._writers.keys()), + ) + asyncio.ensure_future(f).add_done_callback(self._handle_select) + def _run_select( self, to_read: List[int], to_write: List[int] ) -> Tuple[List[int], List[int]]: @@ -543,24 +543,13 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): raise return rs, ws - def _start_selector(self) -> None: - self._selector_task = asyncio.create_task(self._select_loop()) # type: ignore - - async def _select_loop(self) -> None: - while True: - # Capture reader and writer sets here in the event loop - # thread to avoid any problems with concurrent - # modification while the select loop uses them. - rs, ws = await self.run_in_executor( - self._executor, - self._run_select, - list(self._readers.keys()), - list(self._writers.keys()), - ) - for r in rs: - self._handle_event(r, self._readers) - for w in ws: - self._handle_event(w, self._writers) + def _handle_select(self, f: "asyncio.Future[Tuple[List[int], List[int]]]") -> None: + rs, ws = f.result() + for r in rs: + self._handle_event(r, self._readers) + for w in ws: + self._handle_event(w, self._writers) + self._start_select() def _handle_event( self, fd: "_FileDescriptorLike", cb_map: Dict["_FileDescriptorLike", Callable],