_T = TypeVar("_T")
-# Collection of sockets to write to at shutdown to wake up any selector threads.
-_waker_sockets = set() # type: Set[socket.socket]
+# Collection of selector thread event loops to shut down on exit.
+_selector_loops = set() # type: Set[AddThreadSelectorEventLoop]
def _atexit_callback() -> None:
- for fd in _waker_sockets:
+ for loop in _selector_loops:
+ with loop._select_cond:
+ loop._closing_selector = True
+ loop._select_cond.notify()
try:
- fd.send(b"a")
+ loop._waker_w.send(b"a")
except BlockingIOError:
pass
+ # If we don't join our (daemon) thread here, we may get a deadlock
+ # during interpreter shutdown. I don't really understand why. This
+ # deadlock happens every time in CI (both travis and appveyor) but
+ # I've never been able to reproduce locally.
+ loop._thread.join()
+ _selector_loops.clear()
-# atexit callbacks are run in LIFO order. Our callback must run before
-# ThreadPoolExecutor's or it will deadlock (the pool's threads can't
-# finish their work items until we write to their waker sockets). In
-# recent versions of Python the thread pool atexit callback is
-# registered in a getattr hook the first time TPE is *referenced*
-# (instead of older versions of python where it was registered when
-# concurrent.futures was imported).
-concurrent.futures.ThreadPoolExecutor
atexit.register(_atexit_callback)
# in this set are proxied through to the underlying loop.
MY_ATTRIBUTES = {
"_consume_waker",
- "_executor",
+ "_select_cond",
+ "_select_args",
+ "_closing_selector",
+ "_thread",
"_handle_event",
"_readers",
"_real_loop",
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
self._real_loop = real_loop
- # Create our own executor to ensure we always have a thread
- # available (we'll keep it 100% busy) instead of contending
- # with the application for a thread in the default executor.
- self._executor = concurrent.futures.ThreadPoolExecutor(1)
+
+ # Create a thread to run the select system call. We manage this thread
+ # manually so we can trigger a clean shutdown from an atexit hook. Note
+ # that due to the order of operations at shutdown, only daemon threads
+ # can be shut down in this way (non-daemon threads would require the
+ # introduction of a new hook: https://bugs.python.org/issue41962)
+ self._select_cond = threading.Condition()
+ self._select_args = (
+ None
+ ) # type: Optional[Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]]]
+ self._closing_selector = False
+ self._thread = threading.Thread(
+ name="Tornado selector", daemon=True, target=self._run_select,
+ )
+ self._thread.start()
# Start the select loop once the loop is started.
self._real_loop.call_soon(self._start_select)
self._waker_r, self._waker_w = socket.socketpair()
self._waker_r.setblocking(False)
self._waker_w.setblocking(False)
- _waker_sockets.add(self._waker_w)
+ _selector_loops.add(self)
self.add_reader(self._waker_r, self._consume_waker)
def __del__(self) -> None:
# can get a clean shutdown notification. If we're just left to
# be GC'd, we must explicitly close our sockets to avoid
# logging warnings.
- _waker_sockets.discard(self._waker_w)
+ _selector_loops.discard(self)
self._waker_r.close()
self._waker_w.close()
def close(self) -> None:
+ with self._select_cond:
+ self._closing_selector = True
+ self._select_cond.notify()
self._wake_selector()
- self._executor.shutdown()
- _waker_sockets.discard(self._waker_w)
+ self._thread.join()
+ _selector_loops.discard(self)
self._waker_r.close()
self._waker_w.close()
self._real_loop.close()
# Capture reader and writer sets here in the event loop
# thread to avoid any problems with concurrent
# modification while the select loop uses them.
- f = self.run_in_executor(
- self._executor,
- self._run_select,
- list(self._readers.keys()),
- list(self._writers.keys()),
- )
- asyncio.ensure_future(f).add_done_callback(self._handle_select)
-
- def _run_select(
- self, to_read: List[int], to_write: List[int]
- ) -> Tuple[List[int], List[int]]:
- # We use the simpler interface of the select module instead of
- # the more stateful interface in the selectors module because
- # this class is only intended for use on windows, where
- # select.select is the only option. The selector interface
- # does not have well-documented thread-safety semantics that
- # we can rely on so ensuring proper synchronization would be
- # tricky.
- try:
- # On windows, selecting on a socket for write will not
- # return the socket when there is an error (but selecting
- # for reads works). Also select for errors when selecting
- # for writes, and merge the results.
- #
- # This pattern is also used in
- # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
- rs, ws, xs = select.select(to_read, to_write, to_write)
- ws = ws + xs
- except OSError as e:
- # After remove_reader or remove_writer is called, the file
- # descriptor may subsequently be closed on the event loop
- # thread. It's possible that this select thread hasn't
- # gotten into the select system call by the time that
- # happens in which case (at least on macOS), select may
- # raise a "bad file descriptor" error. If we get that
- # error, check and see if we're also being woken up by
- # polling the waker alone. If we are, just return to the
- # event loop and we'll get the updated set of file
- # descriptors on the next iteration. Otherwise, raise the
- # original error.
- if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
- rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
- if rs:
- return rs, []
- raise
- return rs, ws
-
- def _handle_select(self, f: "asyncio.Future[Tuple[List[int], List[int]]]") -> None:
- rs, ws = f.result()
+ with self._select_cond:
+ assert self._select_args is None
+ self._select_args = (list(self._readers.keys()), list(self._writers.keys()))
+ self._select_cond.notify()
+
+ def _run_select(self) -> None:
+ while True:
+ with self._select_cond:
+ while self._select_args is None and not self._closing_selector:
+ self._select_cond.wait()
+ if self._closing_selector:
+ return
+ assert self._select_args is not None
+ to_read, to_write = self._select_args
+ self._select_args = None
+
+ # We use the simpler interface of the select module instead of
+ # the more stateful interface in the selectors module because
+ # this class is only intended for use on windows, where
+ # select.select is the only option. The selector interface
+ # does not have well-documented thread-safety semantics that
+ # we can rely on so ensuring proper synchronization would be
+ # tricky.
+ try:
+ # On windows, selecting on a socket for write will not
+ # return the socket when there is an error (but selecting
+ # for reads works). Also select for errors when selecting
+ # for writes, and merge the results.
+ #
+ # This pattern is also used in
+ # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
+ rs, ws, xs = select.select(to_read, to_write, to_write)
+ ws = ws + xs
+ except OSError as e:
+ # After remove_reader or remove_writer is called, the file
+ # descriptor may subsequently be closed on the event loop
+ # thread. It's possible that this select thread hasn't
+ # gotten into the select system call by the time that
+ # happens in which case (at least on macOS), select may
+ # raise a "bad file descriptor" error. If we get that
+ # error, check and see if we're also being woken up by
+ # polling the waker alone. If we are, just return to the
+ # event loop and we'll get the updated set of file
+ # descriptors on the next iteration. Otherwise, raise the
+ # original error.
+ if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
+ rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
+ if rs:
+ ws = []
+ else:
+ raise
+ else:
+ raise
+ self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws)
+
+ def _handle_select(
+ self, rs: List["_FileDescriptorLike"], ws: List["_FileDescriptorLike"]
+ ) -> None:
for r in rs:
self._handle_event(r, self._readers)
for w in ws: