]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
asyncio: Refactor selector to callbacks instead of coroutine
authorBen Darnell <ben@bendarnell.com>
Tue, 1 Sep 2020 20:48:47 +0000 (16:48 -0400)
committerBen Darnell <ben@bendarnell.com>
Wed, 2 Sep 2020 15:10:13 +0000 (11:10 -0400)
Restarting the event loop to "cleanly" shut down a coroutine introduces
other problems (mainly manifesting as errors logged while running
tornado.test.gen_test). Replace the coroutine with a pair of callbacks
so we don't need to do anything special to shut down without logging
warnings.

tornado/platform/asyncio.py

index ccbf4caf80ddf65f116e97fea04ba40cb61473d1..390d5259be33ba9a44175f0f95820cdd3d57977c 100644 (file)
@@ -423,10 +423,9 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
         "_handle_event",
         "_readers",
         "_real_loop",
+        "_start_select",
         "_run_select",
-        "_select_loop",
-        "_selector_task",
-        "_start_selector",
+        "_handle_select",
         "_wake_selector",
         "_waker_r",
         "_waker_w",
@@ -449,9 +448,8 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
         # available (we'll keep it 100% busy) instead of contending
         # with the application for a thread in the default executor.
         self._executor = concurrent.futures.ThreadPoolExecutor(1)
-        self._selector_task = None
         # Start the select loop once the loop is started.
-        self._real_loop.call_soon(self._start_selector)
+        self._real_loop.call_soon(self._start_select)
 
         self._readers = {}  # type: Dict[_FileDescriptorLike, Callable]
         self._writers = {}  # type: Dict[_FileDescriptorLike, Callable]
@@ -475,16 +473,6 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
         self._waker_w.close()
 
     def close(self) -> None:
-        if self._selector_task is not None:
-            self._selector_task.cancel()
-            try:
-                # Cancellation is not immediate (coroutines are given
-                # a chance to catch the error and recover) so we must
-                # restart the loop here to allow our selector task to
-                # finish and avoid logging warnings at shutdown.
-                self._real_loop.run_until_complete(self._selector_task)
-            except asyncio.CancelledError:
-                pass
         self._wake_selector()
         self._executor.shutdown()
         _waker_sockets.discard(self._waker_w)
@@ -504,6 +492,18 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
         except BlockingIOError:
             pass
 
+    def _start_select(self) -> None:
+        # Capture reader and writer sets here in the event loop
+        # thread to avoid any problems with concurrent
+        # modification while the select loop uses them.
+        f = self.run_in_executor(
+            self._executor,
+            self._run_select,
+            list(self._readers.keys()),
+            list(self._writers.keys()),
+        )
+        asyncio.ensure_future(f).add_done_callback(self._handle_select)
+
     def _run_select(
         self, to_read: List[int], to_write: List[int]
     ) -> Tuple[List[int], List[int]]:
@@ -543,24 +543,13 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
             raise
         return rs, ws
 
-    def _start_selector(self) -> None:
-        self._selector_task = asyncio.create_task(self._select_loop())  # type: ignore
-
-    async def _select_loop(self) -> None:
-        while True:
-            # Capture reader and writer sets here in the event loop
-            # thread to avoid any problems with concurrent
-            # modification while the select loop uses them.
-            rs, ws = await self.run_in_executor(
-                self._executor,
-                self._run_select,
-                list(self._readers.keys()),
-                list(self._writers.keys()),
-            )
-            for r in rs:
-                self._handle_event(r, self._readers)
-            for w in ws:
-                self._handle_event(w, self._writers)
+    def _handle_select(self, f: "asyncio.Future[Tuple[List[int], List[int]]]") -> None:
+        rs, ws = f.result()
+        for r in rs:
+            self._handle_event(r, self._readers)
+        for w in ws:
+            self._handle_event(w, self._writers)
+        self._start_select()
 
     def _handle_event(
         self, fd: "_FileDescriptorLike", cb_map: Dict["_FileDescriptorLike", Callable],