import types
import typing
-from typing import Any, Callable, Optional, Tuple
+from typing import Any, Callable, Optional, Tuple, Union
_T = typing.TypeVar('_T')
return isinstance(x, FUTURES)
-class DummyExecutor(object):
- def submit(self, fn: Callable[..., _T], *args: Any, **kwargs: Any) -> 'Future[_T]':
- future = Future() # type: Future
+class DummyExecutor(futures.Executor):
+ def submit(self, fn: Callable[..., _T], *args: Any, **kwargs: Any) -> 'futures.Future[_T]':
+ future = futures.Future() # type: futures.Future[_T]
try:
future_set_result_unless_cancelled(future, fn(*args, **kwargs))
except Exception:
IOLoop.current().add_future(a, copy)
-def future_set_result_unless_cancelled(future: 'Future[_T]', value: _T) -> None:
+def future_set_result_unless_cancelled(future: Union['futures.Future[_T]', 'Future[_T]'],
+ value: _T) -> None:
"""Set the given ``value`` as the `Future`'s result, if not cancelled.
Avoids asyncio.InvalidStateError when calling set_result() on
future.set_result(value)
-def future_set_exc_info(future: 'Future[_T]',
+def future_set_exc_info(future: Union['futures.Future[_T]', 'Future[_T]'],
exc_info: Tuple[Optional[type], Optional[BaseException],
Optional[types.TracebackType]]) -> None:
"""Set the given ``exc_info`` as the `Future`'s exception.
future.set_exception(exc_info[1])
+@typing.overload
+def future_add_done_callback(future: 'futures.Future[_T]',
+ callback: Callable[['futures.Future[_T]'], None]) -> None:
+ pass
+
+
+@typing.overload # noqa: F811
def future_add_done_callback(future: 'Future[_T]',
callback: Callable[['Future[_T]'], None]) -> None:
+ pass
+
+
+def future_add_done_callback(future: Union['futures.Future[_T]', 'Future[_T]'], # noqa: F811
+ callback: Callable[..., None]) -> None:
"""Arrange to call ``callback`` when ``future`` is complete.
``callback`` is invoked with one argument, the ``future``.
"""Miscellaneous network utility code."""
+import concurrent.futures
import errno
import os
import sys
from tornado.platform.auto import set_close_exec
from tornado.util import Configurable, errno_from_exception
+import typing
+from typing import List, Callable, Any, Type, Generator, Dict, Union, Tuple
+
+if typing.TYPE_CHECKING:
+ from asyncio import Future # noqa: F401
+ from typing import Awaitable # noqa: F401
+
# Note that the naming of ssl.Purpose is confusing; the purpose
# of a context is to authentiate the opposite side of the connection.
_client_ssl_defaults = ssl.create_default_context(
_DEFAULT_BACKLOG = 128
-def bind_sockets(port, address=None, family=socket.AF_UNSPEC,
- backlog=_DEFAULT_BACKLOG, flags=None, reuse_port=False):
+def bind_sockets(port: int, address: str=None,
+ family: socket.AddressFamily=socket.AF_UNSPEC,
+ backlog: int=_DEFAULT_BACKLOG, flags: int=None,
+ reuse_port: bool=False) -> List[socket.socket]:
"""Creates listening sockets bound to the given port and address.
Returns a list of socket objects (multiple sockets are returned if
if flags is None:
flags = socket.AI_PASSIVE
bound_port = None
- unique_addresses = set()
+ unique_addresses = set() # type: set
for res in sorted(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM,
0, flags), key=lambda x: x[0]):
if res in unique_addresses:
if requested_port == 0 and bound_port is not None:
sockaddr = tuple([host, bound_port] + list(sockaddr[2:]))
- sock.setblocking(0)
+ sock.setblocking(False)
sock.bind(sockaddr)
bound_port = sock.getsockname()[1]
sock.listen(backlog)
if hasattr(socket, 'AF_UNIX'):
- def bind_unix_socket(file, mode=0o600, backlog=_DEFAULT_BACKLOG):
+ def bind_unix_socket(file: str, mode: int=0o600,
+ backlog: int=_DEFAULT_BACKLOG) -> socket.socket:
"""Creates a listening unix socket.
If a socket with the given name already exists, it will be deleted.
if errno_from_exception(e) != errno.ENOPROTOOPT:
# Hurd doesn't support SO_REUSEADDR
raise
- sock.setblocking(0)
+ sock.setblocking(False)
try:
st = os.stat(file)
except OSError as err:
return sock
-def add_accept_handler(sock, callback):
+def add_accept_handler(sock: socket.socket,
+ callback: Callable[[socket.socket, Any], None]) -> Callable[[], None]:
"""Adds an `.IOLoop` event handler to accept new connections on ``sock``.
When a connection is accepted, ``callback(connection, address)`` will
io_loop = IOLoop.current()
removed = [False]
- def accept_handler(fd, events):
+ def accept_handler(fd: int, events: int) -> None:
# More connections may come in while we're handling callbacks;
# to prevent starvation of other tasks we must limit the number
# of connections we accept at a time. Ideally we would accept
set_close_exec(connection.fileno())
callback(connection, address)
- def remove_handler():
+ def remove_handler() -> None:
io_loop.remove_handler(sock)
removed[0] = True
return remove_handler
-def is_valid_ip(ip):
+def is_valid_ip(ip: str) -> bool:
"""Returns true if the given string is a well-formed IP address.
Supports IPv4 and IPv6.
`DefaultExecutorResolver`.
"""
@classmethod
- def configurable_base(cls):
+ def configurable_base(cls) -> Type['Resolver']:
return Resolver
@classmethod
- def configurable_default(cls):
+ def configurable_default(cls) -> Type['Resolver']:
return DefaultExecutorResolver
- def resolve(self, host, port, family=socket.AF_UNSPEC):
+ def resolve(
+ self, host: str, port: int, family: socket.AddressFamily=socket.AF_UNSPEC,
+ ) -> 'Future[List[Tuple[int, Any]]]':
"""Resolves an address.
The ``host`` argument is a string which may be a hostname or a
"""
raise NotImplementedError()
- def close(self):
+ def close(self) -> None:
"""Closes the `Resolver`, freeing any resources used.
.. versionadded:: 3.1
pass
-def _resolve_addr(host, port, family=socket.AF_UNSPEC):
+def _resolve_addr(
+ host: str, port: int, family: socket.AddressFamily=socket.AF_UNSPEC,
+) -> List[Tuple[int, Any]]:
# On Solaris, getaddrinfo fails if the given port is not found
# in /etc/services and no socket type is given, so we must pass
# one here. The socket type used here doesn't seem to actually
# so the addresses we return should still be usable with SOCK_DGRAM.
addrinfo = socket.getaddrinfo(host, port, family, socket.SOCK_STREAM)
results = []
- for family, socktype, proto, canonname, address in addrinfo:
- results.append((family, address))
+ for fam, socktype, proto, canonname, address in addrinfo:
+ results.append((fam, address))
return results
.. versionadded:: 5.0
"""
@gen.coroutine
- def resolve(self, host, port, family=socket.AF_UNSPEC):
+ def resolve(
+ self, host: str, port: int, family: socket.AddressFamily=socket.AF_UNSPEC,
+ ) -> Generator[Any, Any, List[Tuple[int, Any]]]:
result = yield IOLoop.current().run_in_executor(
None, _resolve_addr, host, port, family)
- raise gen.Return(result)
+ return result
class ExecutorResolver(Resolver):
The default `Resolver` now uses `.IOLoop.run_in_executor`; use that instead
of this class.
"""
- def initialize(self, executor=None, close_executor=True):
+ def initialize(self, executor: concurrent.futures.Executor=None,
+ close_executor: bool=True) -> None:
self.io_loop = IOLoop.current()
if executor is not None:
self.executor = executor
self.executor = dummy_executor
self.close_executor = False
- def close(self):
+ def close(self) -> None:
if self.close_executor:
self.executor.shutdown()
- self.executor = None
+ self.executor = None # type: ignore
@run_on_executor
- def resolve(self, host, port, family=socket.AF_UNSPEC):
+ def resolve(
+ self, host: str, port: int, family: socket.AddressFamily=socket.AF_UNSPEC,
+ ) -> List[Tuple[int, Any]]:
return _resolve_addr(host, port, family)
The default `Resolver` now uses `.IOLoop.run_in_executor`; use that instead
of this class.
"""
- def initialize(self):
+ def initialize(self) -> None: # type: ignore
super(BlockingResolver, self).initialize()
_threadpool = None # type: ignore
_threadpool_pid = None # type: int
- def initialize(self, num_threads=10):
+ def initialize(self, num_threads: int=10) -> None: # type: ignore
threadpool = ThreadedResolver._create_threadpool(num_threads)
super(ThreadedResolver, self).initialize(
executor=threadpool, close_executor=False)
@classmethod
- def _create_threadpool(cls, num_threads):
+ def _create_threadpool(cls, num_threads: int) -> concurrent.futures.ThreadPoolExecutor:
pid = os.getpid()
if cls._threadpool_pid != pid:
# Threads cannot survive after a fork, so if our pid isn't what it
# was when we created the pool then delete it.
cls._threadpool = None
if cls._threadpool is None:
- from concurrent.futures import ThreadPoolExecutor
- cls._threadpool = ThreadPoolExecutor(num_threads)
+ cls._threadpool = concurrent.futures.ThreadPoolExecutor(num_threads)
cls._threadpool_pid = pid
return cls._threadpool
.. versionchanged:: 5.0
Added support for host-port-family triplets.
"""
- def initialize(self, resolver, mapping):
+ def initialize(self, resolver: Resolver, mapping: dict) -> None: # type: ignore
self.resolver = resolver
self.mapping = mapping
- def close(self):
+ def close(self) -> None:
self.resolver.close()
- def resolve(self, host, port, family=socket.AF_UNSPEC, *args, **kwargs):
+ def resolve(
+ self, host: str, port: int, family: socket.AddressFamily=socket.AF_UNSPEC,
+ ) -> 'Future[List[Tuple[int, Any]]]':
if (host, port, family) in self.mapping:
host, port = self.mapping[(host, port, family)]
elif (host, port) in self.mapping:
host, port = self.mapping[(host, port)]
elif host in self.mapping:
host = self.mapping[host]
- return self.resolver.resolve(host, port, family, *args, **kwargs)
+ return self.resolver.resolve(host, port, family)
# These are the keyword arguments to ssl.wrap_socket that must be translated
'cert_reqs', 'ca_certs', 'ciphers'])
-def ssl_options_to_context(ssl_options):
+def ssl_options_to_context(ssl_options: Union[Dict[str, Any], ssl.SSLContext]) -> ssl.SSLContext:
"""Try to convert an ``ssl_options`` dictionary to an
`~ssl.SSLContext` object.
return context
-def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
+def ssl_wrap_socket(socket: socket.socket, ssl_options: Union[Dict[str, Any], ssl.SSLContext],
+ server_hostname: str=None, **kwargs: Any) -> ssl.SSLSocket:
"""Returns an ``ssl.SSLSocket`` wrapping the given socket.
``ssl_options`` may be either an `ssl.SSLContext` object or a