from tornado.gen import convert_yielded
from tornado.ioloop import IOLoop, _Selectable
-from typing import Any, TypeVar, Awaitable, Callable, Union, Optional, List, Dict
-
-if typing.TYPE_CHECKING:
- from typing import Set, Tuple # noqa: F401
- from typing_extensions import Protocol
+from typing import (
+ Any,
+ Awaitable,
+ Callable,
+ Dict,
+ List,
+ Optional,
+ Protocol,
+ Set,
+ Tuple,
+ TypeVar,
+ Union,
+)
+
+
+class _HasFileno(Protocol):
+ def fileno(self) -> int:
+ pass
- class _HasFileno(Protocol):
- def fileno(self) -> int:
- pass
- _FileDescriptorLike = Union[int, _HasFileno]
+_FileDescriptorLike = Union[int, _HasFileno]
_T = TypeVar("_T")
# Collection of selector thread event loops to shut down on exit.
-_selector_loops = set() # type: Set[SelectorThread]
+_selector_loops: Set["SelectorThread"] = set()
def _atexit_callback() -> None:
# as windows where the default event loop does not implement these methods.
self.selector_loop = asyncio_loop
if hasattr(asyncio, "ProactorEventLoop") and isinstance(
- asyncio_loop, asyncio.ProactorEventLoop # type: ignore
+ asyncio_loop, asyncio.ProactorEventLoop
):
# Ignore this line for mypy because the abstract method checker
# doesn't understand dynamic proxies.
self.selector_loop = AddThreadSelectorEventLoop(asyncio_loop) # type: ignore
# Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
- self.handlers = {} # type: Dict[int, Tuple[Union[int, _Selectable], Callable]]
+ self.handlers: Dict[int, Tuple[Union[int, _Selectable], Callable]] = {}
# Set of fds listening for reads/writes
- self.readers = set() # type: Set[int]
- self.writers = set() # type: Set[int]
+ self.readers: Set[int] = set()
+ self.writers: Set[int] = set()
self.closing = False
# If an asyncio loop was closed through an asyncio interface
# instead of IOLoop.close(), we'd never hear about it and may
def get_event_loop(self) -> asyncio.AbstractEventLoop:
try:
return super().get_event_loop()
- except (RuntimeError, AssertionError):
- # This was an AssertionError in Python 3.4.2 (which ships with Debian Jessie)
- # and changed to a RuntimeError in 3.4.3.
+ except RuntimeError:
# "There is no current event loop in thread %r"
loop = self.new_event_loop()
self.set_event_loop(loop)
self._real_loop = real_loop
self._select_cond = threading.Condition()
- self._select_args = (
- None
- ) # type: Optional[Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]]]
+ self._select_args: Optional[
+ Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]]
+ ] = None
self._closing_selector = False
- self._thread = None # type: Optional[threading.Thread]
+ self._thread: Optional[threading.Thread] = None
self._thread_manager_handle = self._thread_manager()
async def thread_manager_anext() -> None:
lambda: self._real_loop.create_task(thread_manager_anext())
)
- self._readers = {} # type: Dict[_FileDescriptorLike, Callable]
- self._writers = {} # type: Dict[_FileDescriptorLike, Callable]
+ self._readers: Dict[_FileDescriptorLike, Callable] = {}
+ self._writers: Dict[_FileDescriptorLike, Callable] = {}
# Writing to _waker_w will wake up the selector thread, which
# watches for _waker_r to be readable.
pass
def _handle_select(
- self, rs: List["_FileDescriptorLike"], ws: List["_FileDescriptorLike"]
+ self, rs: List[_FileDescriptorLike], ws: List[_FileDescriptorLike]
) -> None:
for r in rs:
self._handle_event(r, self._readers)
def _handle_event(
self,
- fd: "_FileDescriptorLike",
- cb_map: Dict["_FileDescriptorLike", Callable],
+ fd: _FileDescriptorLike,
+ cb_map: Dict[_FileDescriptorLike, Callable],
) -> None:
try:
callback = cb_map[fd]
callback()
def add_reader(
- self, fd: "_FileDescriptorLike", callback: Callable[..., None], *args: Any
+ self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
) -> None:
self._readers[fd] = functools.partial(callback, *args)
self._wake_selector()
def add_writer(
- self, fd: "_FileDescriptorLike", callback: Callable[..., None], *args: Any
+ self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
) -> None:
self._writers[fd] = functools.partial(callback, *args)
self._wake_selector()
- def remove_reader(self, fd: "_FileDescriptorLike") -> bool:
+ def remove_reader(self, fd: _FileDescriptorLike) -> bool:
try:
del self._readers[fd]
except KeyError:
self._wake_selector()
return True
- def remove_writer(self, fd: "_FileDescriptorLike") -> bool:
+ def remove_writer(self, fd: _FileDescriptorLike) -> bool:
try:
del self._writers[fd]
except KeyError: