"_handle_event",
"_readers",
"_real_loop",
+ "_start_select",
"_run_select",
- "_select_loop",
- "_selector_task",
- "_start_selector",
+ "_handle_select",
"_wake_selector",
"_waker_r",
"_waker_w",
# available (we'll keep it 100% busy) instead of contending
# with the application for a thread in the default executor.
self._executor = concurrent.futures.ThreadPoolExecutor(1)
- self._selector_task = None
# Start the select loop once the loop is started.
- self._real_loop.call_soon(self._start_selector)
+ self._real_loop.call_soon(self._start_select)
self._readers = {} # type: Dict[_FileDescriptorLike, Callable]
self._writers = {} # type: Dict[_FileDescriptorLike, Callable]
self._waker_w.close()
def close(self) -> None:
- if self._selector_task is not None:
- self._selector_task.cancel()
- try:
- # Cancellation is not immediate (coroutines are given
- # a chance to catch the error and recover) so we must
- # restart the loop here to allow our selector task to
- # finish and avoid logging warnings at shutdown.
- self._real_loop.run_until_complete(self._selector_task)
- except asyncio.CancelledError:
- pass
self._wake_selector()
self._executor.shutdown()
_waker_sockets.discard(self._waker_w)
except BlockingIOError:
pass
+ def _start_select(self) -> None:
+ # Capture reader and writer sets here in the event loop
+ # thread to avoid any problems with concurrent
+ # modification while the select loop uses them.
+ f = self.run_in_executor(
+ self._executor,
+ self._run_select,
+ list(self._readers.keys()),
+ list(self._writers.keys()),
+ )
+ asyncio.ensure_future(f).add_done_callback(self._handle_select)
+
def _run_select(
self, to_read: List[int], to_write: List[int]
) -> Tuple[List[int], List[int]]:
raise
return rs, ws
- def _start_selector(self) -> None:
- self._selector_task = asyncio.create_task(self._select_loop()) # type: ignore
-
- async def _select_loop(self) -> None:
- while True:
- # Capture reader and writer sets here in the event loop
- # thread to avoid any problems with concurrent
- # modification while the select loop uses them.
- rs, ws = await self.run_in_executor(
- self._executor,
- self._run_select,
- list(self._readers.keys()),
- list(self._writers.keys()),
- )
- for r in rs:
- self._handle_event(r, self._readers)
- for w in ws:
- self._handle_event(w, self._writers)
+ def _handle_select(self, f: "asyncio.Future[Tuple[List[int], List[int]]]") -> None:
+ rs, ws = f.result()
+ for r in rs:
+ self._handle_event(r, self._readers)
+ for w in ws:
+ self._handle_event(w, self._writers)
+ self._start_select()
def _handle_event(
self, fd: "_FileDescriptorLike", cb_map: Dict["_FileDescriptorLike", Callable],