from tornado.util import unicode_type
from tornado.web import RequestHandler
-from typing import List, Any, Dict, cast, Iterable, Union, Optional
+from typing import Any, cast, Iterable, Union, Optional
class AuthError(Exception):
def authenticate_redirect(
self,
callback_uri: Optional[str] = None,
- ax_attrs: List[str] = ["name", "email", "language", "username"],
+ ax_attrs: list[str] = ["name", "email", "language", "username"],
) -> None:
"""Redirects to the authentication URL for this service.
async def get_authenticated_user(
self, http_client: Optional[httpclient.AsyncHTTPClient] = None
- ) -> Dict[str, Any]:
+ ) -> dict[str, Any]:
"""Fetches the authenticated user data upon redirect.
This method should be called by the handler that receives the
"""
handler = cast(RequestHandler, self)
# Verify the OpenID response via direct request to the OP
- args: Dict[str, Union[str, bytes]] = {
+ args: dict[str, Union[str, bytes]] = {
k: v[-1] for k, v in handler.request.arguments.items()
}
args["openid.mode"] = "check_authentication"
callback_uri: str,
ax_attrs: Iterable[str] = [],
oauth_scope: Optional[str] = None,
- ) -> Dict[str, str]:
+ ) -> dict[str, str]:
handler = cast(RequestHandler, self)
url = urllib.parse.urljoin(handler.request.full_url(), callback_uri)
args = {
}
)
ax_attrs = set(ax_attrs)
- required: List[str] = []
+ required: list[str] = []
if "name" in ax_attrs:
ax_attrs -= {"name", "firstname", "fullname", "lastname"}
required += ["firstname", "fullname", "lastname"]
def _on_authentication_verified(
self, response: httpclient.HTTPResponse
- ) -> Dict[str, Any]:
+ ) -> dict[str, Any]:
handler = cast(RequestHandler, self)
if b"is_valid:true" not in response.body:
raise AuthError("Invalid OpenID response: %r" % response.body)
async def authorize_redirect(
self,
callback_uri: Optional[str] = None,
- extra_params: Optional[Dict[str, Any]] = None,
+ extra_params: Optional[dict[str, Any]] = None,
http_client: Optional[httpclient.AsyncHTTPClient] = None,
) -> None:
"""Redirects the user to obtain OAuth authorization for this service.
async def get_authenticated_user(
self, http_client: Optional[httpclient.AsyncHTTPClient] = None
- ) -> Dict[str, Any]:
+ ) -> dict[str, Any]:
"""Gets the OAuth authorized user and access token.
This method should be called from the handler for your
)
if cookie_key != request_key:
raise AuthError("Request token does not match cookie")
- token: Dict[str, Union[str, bytes]] = dict(key=cookie_key, secret=cookie_secret)
+ token: dict[str, Union[str, bytes]] = dict(key=cookie_key, secret=cookie_secret)
if oauth_verifier:
token["verifier"] = oauth_verifier
if http_client is None:
def _oauth_request_token_url(
self,
callback_uri: Optional[str] = None,
- extra_params: Optional[Dict[str, Any]] = None,
+ extra_params: Optional[dict[str, Any]] = None,
) -> str:
handler = cast(RequestHandler, self)
consumer_token = self._oauth_consumer_token()
)
handler.redirect(authorize_url + "?" + urllib.parse.urlencode(args))
- def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
+ def _oauth_access_token_url(self, request_token: dict[str, Any]) -> str:
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
args = dict(
args["oauth_signature"] = signature
return url + "?" + urllib.parse.urlencode(args)
- def _oauth_consumer_token(self) -> Dict[str, Any]:
+ def _oauth_consumer_token(self) -> dict[str, Any]:
"""Subclasses must override this to return their OAuth consumer keys.
The return value should be a `dict` with keys ``key`` and ``secret``.
raise NotImplementedError()
async def _oauth_get_user_future(
- self, access_token: Dict[str, Any]
- ) -> Dict[str, Any]:
+ self, access_token: dict[str, Any]
+ ) -> dict[str, Any]:
"""Subclasses must override this to get basic information about the
user.
def _oauth_request_parameters(
self,
url: str,
- access_token: Dict[str, Any],
- parameters: Dict[str, Any] = {},
+ access_token: dict[str, Any],
+ parameters: dict[str, Any] = {},
method: str = "GET",
- ) -> Dict[str, Any]:
+ ) -> dict[str, Any]:
"""Returns the OAuth parameters as a dict for the given request.
parameters should include all POST arguments and query string arguments
redirect_uri: Optional[str] = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
- extra_params: Optional[Dict[str, Any]] = None,
- scope: Optional[List[str]] = None,
+ extra_params: Optional[dict[str, Any]] = None,
+ scope: Optional[list[str]] = None,
response_type: str = "code",
) -> None:
"""Redirects the user to obtain OAuth authorization for this service.
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
code: Optional[str] = None,
- extra_params: Optional[Dict[str, Any]] = None,
+ extra_params: Optional[dict[str, Any]] = None,
) -> str:
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
- args: Dict[str, str] = {}
+ args: dict[str, str] = {}
if redirect_uri is not None:
args["redirect_uri"] = redirect_uri
if code is not None:
self,
url: str,
access_token: Optional[str] = None,
- post_args: Optional[Dict[str, Any]] = None,
+ post_args: Optional[dict[str, Any]] = None,
**args: Any,
) -> Any:
"""Fetches the given URL auth an OAuth2 access token.
async def twitter_request(
self,
path: str,
- access_token: Dict[str, Any],
- post_args: Optional[Dict[str, Any]] = None,
+ access_token: dict[str, Any],
+ post_args: Optional[dict[str, Any]] = None,
**args: Any,
) -> Any:
"""Fetches the given API path, e.g., ``statuses/user_timeline/btaylor``
response = await http.fetch(url)
return escape.json_decode(response.body)
- def _oauth_consumer_token(self) -> Dict[str, Any]:
+ def _oauth_consumer_token(self) -> dict[str, Any]:
handler = cast(RequestHandler, self)
handler.require_setting("twitter_consumer_key", "Twitter OAuth")
handler.require_setting("twitter_consumer_secret", "Twitter OAuth")
)
async def _oauth_get_user_future(
- self, access_token: Dict[str, Any]
- ) -> Dict[str, Any]:
+ self, access_token: dict[str, Any]
+ ) -> dict[str, Any]:
user = await self.twitter_request(
"/account/verify_credentials", access_token=access_token
)
_OAUTH_NO_CALLBACKS = False
_OAUTH_SETTINGS_KEY = "google_oauth"
- def get_google_oauth_settings(self) -> Dict[str, str]:
+ def get_google_oauth_settings(self) -> dict[str, str]:
"""Return the Google OAuth 2.0 credentials that you created with
[Google Cloud
Platform](https://console.cloud.google.com/apis/credentials). The dict
code: str,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
- ) -> Dict[str, Any]:
+ ) -> dict[str, Any]:
"""Handles the login for the Google user, returning an access token.
The result is a dictionary containing an ``access_token`` field
client_id: str,
client_secret: str,
code: str,
- extra_fields: Optional[Dict[str, Any]] = None,
- ) -> Optional[Dict[str, Any]]:
+ extra_fields: Optional[dict[str, Any]] = None,
+ ) -> Optional[dict[str, Any]]:
"""Handles the login for the Facebook user, returning a user object.
Example usage:
self,
path: str,
access_token: Optional[str] = None,
- post_args: Optional[Dict[str, Any]] = None,
+ post_args: Optional[dict[str, Any]] = None,
**args: Any,
) -> Any:
"""Fetches the given relative API path, e.g., "/btaylor/picture"
def _oauth_signature(
- consumer_token: Dict[str, Any],
+ consumer_token: dict[str, Any],
method: str,
url: str,
- parameters: Dict[str, Any] = {},
- token: Optional[Dict[str, Any]] = None,
+ parameters: dict[str, Any] = {},
+ token: Optional[dict[str, Any]] = None,
) -> bytes:
"""Calculates the HMAC-SHA1 OAuth signature for the given request.
def _oauth10a_signature(
- consumer_token: Dict[str, Any],
+ consumer_token: dict[str, Any],
method: str,
url: str,
- parameters: Dict[str, Any] = {},
- token: Optional[Dict[str, Any]] = None,
+ parameters: dict[str, Any] = {},
+ token: Optional[dict[str, Any]] = None,
) -> bytes:
"""Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request.
return urllib.parse.quote(val, safe="~")
-def _oauth_parse_response(body: bytes) -> Dict[str, Any]:
+def _oauth_parse_response(body: bytes) -> dict[str, Any]:
# I can't find an officially-defined encoding for oauth responses and
# have never seen anyone use non-ascii. Leave the response in a byte
# string for python 2, and use utf8 on python 3.
except ImportError:
signal = None # type: ignore
-from typing import Callable, Dict, Optional, List, Union
+from typing import Callable, Optional, Union
# os.execv is broken on Windows and can't properly parse command line
# arguments and executable name if they contain whitespaces. subprocess
weakref.WeakKeyDictionary()
)
_autoreload_is_main = False
-_original_argv: Optional[List[str]] = None
+_original_argv: Optional[list[str]] = None
_original_spec = None
_io_loops[io_loop] = True
if len(_io_loops) > 1:
gen_log.warning("tornado.autoreload started more than once in the same process")
- modify_times: Dict[str, float] = {}
+ modify_times: dict[str, float] = {}
callback = functools.partial(_reload_on_update, modify_times)
scheduler = ioloop.PeriodicCallback(callback, check_time)
scheduler.start()
_reload_hooks.append(fn)
-def _reload_on_update(modify_times: Dict[str, float]) -> None:
+def _reload_on_update(modify_times: dict[str, float]) -> None:
if _reload_attempted:
# We already tried to reload and it didn't work, so don't try again.
return
_check_file(modify_times, path)
-def _check_file(modify_times: Dict[str, float], path: str) -> None:
+def _check_file(modify_times: dict[str, float], path: str) -> None:
try:
modified = os.stat(path).st_mtime
except Exception:
from tornado.log import app_log
import typing
-from typing import Any, Callable, Optional, Tuple, Union
+from typing import Any, Callable, Optional, Union
_T = typing.TypeVar("_T")
def future_set_exc_info(
future: "Union[futures.Future[_T], Future[_T]]",
- exc_info: Tuple[
+ exc_info: tuple[
Optional[type], Optional[BaseException], Optional[types.TracebackType]
],
) -> None:
)
from tornado.log import app_log
-from typing import Dict, Any, Callable, Union, Optional
+from typing import Any, Callable, Union, Optional
import typing
if typing.TYPE_CHECKING:
class CurlAsyncHTTPClient(AsyncHTTPClient):
def initialize( # type: ignore
- self, max_clients: int = 10, defaults: Optional[Dict[str, Any]] = None
+ self, max_clients: int = 10, defaults: Optional[dict[str, Any]] = None
) -> None:
super().initialize(defaults=defaults)
# Typeshed is incomplete for CurlMulti, so just use Any for now.
self._curls = [self._curl_create() for i in range(max_clients)]
self._free_list = self._curls[:]
self._requests: Deque[
- Tuple[HTTPRequest, Callable[[HTTPResponse], None], float]
+ tuple[HTTPRequest, Callable[[HTTPResponse], None], float]
] = collections.deque()
- self._fds: Dict[int, int] = {}
+ self._fds: dict[int, int] = {}
self._timeout: Optional[object] = None
# libcurl has bugs that sometimes cause it to not report all
from tornado.util import unicode_type
import typing
-from typing import Union, Any, Optional, Dict, List, Callable
+from typing import Union, Any, Optional, Callable
def xhtml_escape(value: Union[str, bytes]) -> str:
def parse_qs_bytes(
qs: Union[str, bytes], keep_blank_values: bool = False, strict_parsing: bool = False
-) -> Dict[str, List[bytes]]:
+) -> dict[str, list[bytes]]:
"""Parses a query string like urlparse.parse_qs,
but takes bytes and returns the values as byte strings.
shorten: bool = False,
extra_params: Union[str, Callable[[str], str]] = "",
require_protocol: bool = False,
- permitted_protocols: List[str] = ["http", "https"],
+ permitted_protocols: list[str] = ["http", "https"],
) -> str:
"""Converts plain text into HTML with links.
_T = typing.TypeVar("_T")
_Yieldable = Union[
- None, Awaitable, List[Awaitable], Dict[Any, Awaitable], concurrent.futures.Future
+ None, Awaitable, list[Awaitable], dict[Any, Awaitable], concurrent.futures.Future
]
"""
- _unfinished: Dict[Future, Union[int, str]] = {}
+ _unfinished: dict[Future, Union[int, str]] = {}
def __init__(self, *args: Future, **kwargs: Future) -> None:
if args and kwargs:
@overload
def multi(
children: Sequence[_Yieldable],
- quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]] = (),
-) -> Future[List]: ...
+ quiet_exceptions: Union[type[Exception], tuple[type[Exception], ...]] = (),
+) -> Future[list]: ...
@overload
def multi(
children: Mapping[Any, _Yieldable],
- quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]] = (),
-) -> Future[Dict]: ...
+ quiet_exceptions: Union[type[Exception], tuple[type[Exception], ...]] = (),
+) -> Future[dict]: ...
def multi(
Use `multi` instead.
"""
if isinstance(children, dict):
- keys: Optional[List] = list(children.keys())
+ keys: Optional[list] = list(children.keys())
children_seq: Iterable = children.values()
else:
keys = None
else:
future_set_result_unless_cancelled(future, result_list)
- listening: Set[Future] = set()
+ listening: set[Future] = set()
for f in children_futs:
if f not in listening:
listening.add(f)
return True
def handle_exception(
- self, typ: Type[Exception], value: Exception, tb: types.TracebackType
+ self, typ: type[Exception], value: Exception, tb: types.TracebackType
) -> bool:
if not self.running and not self.finished:
self.future = Future()
from tornado.util import GzipDecompressor
-from typing import cast, Optional, Type, Awaitable, Callable, Union, Tuple
+from typing import cast, Optional, Type, Awaitable, Callable, Union
CR_OR_LF_RE = re.compile(b"\r|\n")
if not self._finish_future.done():
future_set_result_unless_cancelled(self._finish_future, None)
- def _parse_headers(self, data: bytes) -> Tuple[str, httputil.HTTPHeaders]:
+ def _parse_headers(self, data: bytes) -> tuple[str, httputil.HTTPHeaders]:
# The lstrip removes newlines that some implementations sometimes
# insert between messages of a reused connection. Per RFC 7230,
# we SHOULD ignore at least one empty line before the request.
from tornado.ioloop import IOLoop
from tornado.util import Configurable
-from typing import Type, Any, Union, Dict, Callable, Optional, Awaitable, cast
+from typing import Type, Any, Union, Callable, Optional, Awaitable, cast
class HTTPClient:
"""
- _instance_cache: Dict[IOLoop, "AsyncHTTPClient"]
+ _instance_cache: dict[IOLoop, "AsyncHTTPClient"]
@classmethod
- def configurable_base(cls) -> Type[Configurable]:
+ def configurable_base(cls) -> type[Configurable]:
return AsyncHTTPClient
@classmethod
- def configurable_default(cls) -> Type[Configurable]:
+ def configurable_default(cls) -> type[Configurable]:
from tornado.simple_httpclient import SimpleAsyncHTTPClient
return SimpleAsyncHTTPClient
@classmethod
- def _async_clients(cls) -> Dict[IOLoop, "AsyncHTTPClient"]:
+ def _async_clients(cls) -> dict[IOLoop, "AsyncHTTPClient"]:
attr_name = "_async_client_dict_" + cls.__name__
if not hasattr(cls, attr_name):
setattr(cls, attr_name, weakref.WeakKeyDictionary())
instance_cache[instance.io_loop] = instance
return instance
- def initialize(self, defaults: Optional[Dict[str, Any]] = None) -> None:
+ def initialize(self, defaults: Optional[dict[str, Any]] = None) -> None:
self.io_loop = IOLoop.current()
self.defaults = dict(HTTPRequest._DEFAULTS)
if defaults is not None:
class HTTPRequest:
"""HTTP client request object."""
- _headers: Union[Dict[str, str], httputil.HTTPHeaders]
+ _headers: Union[dict[str, str], httputil.HTTPHeaders]
# Default values for HTTPRequest parameters.
# Merged with the values on the request object by AsyncHTTPClient
self,
url: str,
method: str = "GET",
- headers: Optional[Union[Dict[str, str], httputil.HTTPHeaders]] = None,
+ headers: Optional[Union[dict[str, str], httputil.HTTPHeaders]] = None,
body: Optional[Union[bytes, str]] = None,
auth_username: Optional[str] = None,
auth_password: Optional[str] = None,
] = None,
expect_100_continue: bool = False,
decompress_response: Optional[bool] = None,
- ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
+ ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None,
) -> None:
r"""All parameters except ``url`` are optional.
return self._headers # type: ignore
@headers.setter
- def headers(self, value: Union[Dict[str, str], httputil.HTTPHeaders]) -> None:
+ def headers(self, value: Union[dict[str, str], httputil.HTTPHeaders]) -> None:
if value is None:
self._headers = httputil.HTTPHeaders()
else:
effective_url: Optional[str] = None,
error: Optional[BaseException] = None,
request_time: Optional[float] = None,
- time_info: Optional[Dict[str, float]] = None,
+ time_info: Optional[dict[str, float]] = None,
reason: Optional[str] = None,
start_time: Optional[float] = None,
) -> None:
"""
def __init__(
- self, request: HTTPRequest, defaults: Optional[Dict[str, Any]]
+ self, request: HTTPRequest, defaults: Optional[dict[str, Any]]
) -> None:
self.request = request
self.defaults = defaults
from tornado.util import Configurable
import typing
-from typing import Union, Any, Dict, Callable, List, Type, Tuple, Optional, Awaitable
+from typing import Union, Any, Callable, Optional, Awaitable
if typing.TYPE_CHECKING:
from typing import Set # noqa: F401
],
no_keep_alive: bool = False,
xheaders: bool = False,
- ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
+ ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None,
protocol: Optional[str] = None,
decompress_request: bool = False,
chunk_size: Optional[int] = None,
body_timeout: Optional[float] = None,
max_body_size: Optional[int] = None,
max_buffer_size: Optional[int] = None,
- trusted_downstream: Optional[List[str]] = None,
+ trusted_downstream: Optional[list[str]] = None,
) -> None:
# This method's signature is not extracted with autodoc
# because we want its arguments to appear on the class
max_buffer_size=max_buffer_size,
read_chunk_size=chunk_size,
)
- self._connections: Set[HTTP1ServerConnection] = set()
+ self._connections: set[HTTP1ServerConnection] = set()
self.trusted_downstream = trusted_downstream
@classmethod
- def configurable_base(cls) -> Type[Configurable]:
+ def configurable_base(cls) -> type[Configurable]:
return HTTPServer
@classmethod
- def configurable_default(cls) -> Type[Configurable]:
+ def configurable_default(cls) -> type[Configurable]:
return HTTPServer
async def close_all_connections(self) -> None:
conn = next(iter(self._connections))
await conn.close()
- def handle_stream(self, stream: iostream.IOStream, address: Tuple) -> None:
+ def handle_stream(self, stream: iostream.IOStream, address: tuple) -> None:
context = _HTTPRequestContext(
stream, address, self.protocol, self.trusted_downstream
)
self.request_callback = request_callback
self.request: Optional[httputil.HTTPServerRequest] = None
self.delegate = None
- self._chunks: List[bytes] = []
+ self._chunks: list[bytes] = []
def headers_received(
self,
def __init__(
self,
stream: iostream.IOStream,
- address: Tuple,
+ address: tuple,
protocol: Optional[str],
- trusted_downstream: Optional[List[str]] = None,
+ trusted_downstream: Optional[list[str]] = None,
) -> None:
self.address = address
# Save the socket's address family now so we know how to
Callable,
Pattern,
Any,
- Dict,
TypeVar,
- Tuple,
)
from types import TracebackType
def __init__(self) -> None:
# A sequence of (False, bytearray) and (True, memoryview) objects
- self._buffers: Deque[Tuple[bool, Union[bytearray, memoryview]]] = (
+ self._buffers: Deque[tuple[bool, Union[bytearray, memoryview]]] = (
collections.deque()
)
# Position in the first buffer
self._read_partial = False
self._read_until_close = False
self._read_future: Optional[Future] = None
- self._write_futures: Deque[Tuple[int, Future[None]]] = collections.deque()
+ self._write_futures: Deque[tuple[int, Future[None]]] = collections.deque()
self._close_callback: Optional[Callable[[], None]] = None
self._connect_future: Optional[Future[IOStream]] = None
# _ssl_connect_future should be defined in SSLIOStream
None,
bool,
BaseException,
- Tuple[
+ tuple[
"Optional[Type[BaseException]]",
Optional[BaseException],
Optional[TracebackType],
self._signal_closed()
def _signal_closed(self) -> None:
- futures: List[Future] = []
+ futures: list[Future] = []
if self._read_future is not None:
futures.append(self._read_future)
self._read_future = None
def start_tls(
self,
server_side: bool,
- ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
+ ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None,
server_hostname: Optional[str] = None,
) -> Awaitable["SSLIOStream"]:
"""Convert this `IOStream` to an `SSLIOStream`.
super()._handle_write()
def connect(
- self, address: Tuple, server_hostname: Optional[str] = None
+ self, address: tuple, server_hostname: Optional[str] = None
) -> "Future[SSLIOStream]":
self._server_hostname = server_hostname
# Ignore the result of connect(). If it fails,
def __init__(self) -> None:
self._value = False
- self._waiters: Set[Future[None]] = set()
+ self._waiters: set[Future[None]] = set()
def __repr__(self) -> str:
return "<{} {}>".format(
except ImportError:
curses = None # type: ignore
-from typing import Dict, Any, cast, Optional
+from typing import Any, cast, Optional
# Logger objects for internal tornado use
access_log = logging.getLogger("tornado.access")
datefmt: str = DEFAULT_DATE_FORMAT,
style: str = "%",
color: bool = True,
- colors: Dict[int, int] = DEFAULT_COLORS,
+ colors: dict[int, int] = DEFAULT_COLORS,
) -> None:
r"""
:arg bool color: Enables color support.
logging.Formatter.__init__(self, datefmt=datefmt)
self._fmt = fmt
- self._colors: Dict[int, str] = {}
+ self._colors: dict[int, str] = {}
if color and _stderr_supports_color():
if curses is not None:
fg_color = curses.tigetstr("setaf") or curses.tigetstr("setf") or b""
from tornado.ioloop import IOLoop
from tornado.util import Configurable, errno_from_exception
-from typing import List, Callable, Any, Type, Dict, Union, Tuple, Awaitable, Optional
+from typing import Callable, Any, Union, Awaitable, Optional
# Note that the naming of ssl.Purpose is confusing; the purpose
# of a context is to authenticate the opposite side of the connection.
backlog: int = _DEFAULT_BACKLOG,
flags: Optional[int] = None,
reuse_port: bool = False,
-) -> List[socket.socket]:
+) -> list[socket.socket]:
"""Creates listening sockets bound to the given port and address.
Returns a list of socket objects (multiple sockets are returned if
"""
@classmethod
- def configurable_base(cls) -> Type["Resolver"]:
+ def configurable_base(cls) -> type["Resolver"]:
return Resolver
@classmethod
- def configurable_default(cls) -> Type["Resolver"]:
+ def configurable_default(cls) -> type["Resolver"]:
return DefaultLoopResolver
def resolve(
self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC
- ) -> Awaitable[List[Tuple[int, Any]]]:
+ ) -> Awaitable[list[tuple[int, Any]]]:
"""Resolves an address.
The ``host`` argument is a string which may be a hostname or a
def _resolve_addr(
host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC
-) -> List[Tuple[int, Any]]:
+) -> list[tuple[int, Any]]:
# On Solaris, getaddrinfo fails if the given port is not found
# in /etc/services and no socket type is given, so we must pass
# one here. The socket type used here doesn't seem to actually
async def resolve(
self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC
- ) -> List[Tuple[int, Any]]:
+ ) -> list[tuple[int, Any]]:
result = await IOLoop.current().run_in_executor(
None, _resolve_addr, host, port, family
)
async def resolve(
self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC
- ) -> List[Tuple[int, Any]]:
+ ) -> list[tuple[int, Any]]:
# On Solaris, getaddrinfo fails if the given port is not found
# in /etc/services and no socket type is given, so we must pass
# one here. The socket type used here doesn't seem to actually
@run_on_executor
def resolve(
self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC
- ) -> List[Tuple[int, Any]]:
+ ) -> list[tuple[int, Any]]:
return _resolve_addr(host, port, family)
def resolve(
self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC
- ) -> Awaitable[List[Tuple[int, Any]]]:
+ ) -> Awaitable[list[tuple[int, Any]]]:
if (host, port, family) in self.mapping:
host, port = self.mapping[(host, port, family)]
elif (host, port) in self.mapping:
def ssl_options_to_context(
- ssl_options: Union[Dict[str, Any], ssl.SSLContext],
+ ssl_options: Union[dict[str, Any], ssl.SSLContext],
server_side: Optional[bool] = None,
) -> ssl.SSLContext:
"""Try to convert an ``ssl_options`` dictionary to an
def ssl_wrap_socket(
socket: socket.socket,
- ssl_options: Union[Dict[str, Any], ssl.SSLContext],
+ ssl_options: Union[dict[str, Any], ssl.SSLContext],
server_hostname: Optional[str] = None,
server_side: Optional[bool] = None,
**kwargs: Any,
Any,
Iterator,
Iterable,
- Tuple,
- Set,
- Dict,
Callable,
- List,
TextIO,
Optional,
)
def __setitem__(self, name: str, value: Any) -> None:
return self.__setattr__(name, value)
- def items(self) -> Iterable[Tuple[str, Any]]:
+ def items(self) -> Iterable[tuple[str, Any]]:
"""An iterable of (name, value) pairs.
.. versionadded:: 3.1
"""
return [(opt.name, opt.value()) for name, opt in self._options.items()]
- def groups(self) -> Set[str]:
+ def groups(self) -> set[str]:
"""The set of option-groups created by ``define``.
.. versionadded:: 3.1
"""
return {opt.group_name for opt in self._options.values()}
- def group_dict(self, group: str) -> Dict[str, Any]:
+ def group_dict(self, group: str) -> dict[str, Any]:
"""The names and values of options in a group.
Useful for copying options into Application settings::
if not group or group == opt.group_name
}
- def as_dict(self) -> Dict[str, Any]:
+ def as_dict(self) -> dict[str, Any]:
"""The names and values of all options.
.. versionadded:: 3.1
self._options[normalized] = option
def parse_command_line(
- self, args: Optional[List[str]] = None, final: bool = True
- ) -> List[str]:
+ self, args: Optional[list[str]] = None, final: bool = True
+ ) -> list[str]:
"""Parses all options given on the command line (defaults to
`sys.argv`).
"""
if args is None:
args = sys.argv
- remaining: List[str] = []
+ remaining: list[str] = []
for i in range(1, len(args)):
# All things after the last option are command line arguments
if not args[i].startswith("-"):
file = sys.stderr
print("Usage: %s [OPTIONS]" % sys.argv[0], file=file)
print("\nOptions:\n", file=file)
- by_group: Dict[str, List[_Option]] = {}
+ by_group: dict[str, list[_Option]] = {}
for option in self._options.values():
by_group.setdefault(option.group_name, []).append(option)
def parse_command_line(
- args: Optional[List[str]] = None, final: bool = True
-) -> List[str]:
+ args: Optional[list[str]] = None, final: bool = True
+) -> list[str]:
"""Parses global options from the command line.
See `OptionParser.parse_command_line`.
from typing import (
Any,
Callable,
- Dict,
- List,
Optional,
Protocol,
- Set,
- Tuple,
TypeVar,
Union,
)
_Ts = TypeVarTuple("_Ts")
# Collection of selector thread event loops to shut down on exit.
-_selector_loops: Set["SelectorThread"] = set()
+_selector_loops: set["SelectorThread"] = set()
def _atexit_callback() -> None:
# doesn't understand dynamic proxies.
self.selector_loop = AddThreadSelectorEventLoop(asyncio_loop) # type: ignore
# Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
- self.handlers: Dict[int, Tuple[Union[int, _Selectable], Callable]] = {}
+ self.handlers: dict[int, tuple[Union[int, _Selectable], Callable]] = {}
# Set of fds listening for reads/writes
- self.readers: Set[int] = set()
- self.writers: Set[int] = set()
+ self.readers: set[int] = set()
+ self.writers: set[int] = set()
self.closing = False
# If an asyncio loop was closed through an asyncio interface
# instead of IOLoop.close(), we'd never hear about it and may
self._select_cond = threading.Condition()
self._select_args: Optional[
- Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]]
+ tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]]
] = None
self._closing_selector = False
self._thread: Optional[threading.Thread] = None
context=self._main_thread_ctx,
)
- self._readers: Dict[_FileDescriptorLike, Callable] = {}
- self._writers: Dict[_FileDescriptorLike, Callable] = {}
+ self._readers: dict[_FileDescriptorLike, Callable] = {}
+ self._writers: dict[_FileDescriptorLike, Callable] = {}
# Writing to _waker_w will wake up the selector thread, which
# watches for _waker_r to be readable.
pass
def _handle_select(
- self, rs: List[_FileDescriptorLike], ws: List[_FileDescriptorLike]
+ self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike]
) -> None:
for r in rs:
self._handle_event(r, self._readers)
def _handle_event(
self,
fd: _FileDescriptorLike,
- cb_map: Dict[_FileDescriptorLike, Callable],
+ cb_map: dict[_FileDescriptorLike, Callable],
) -> None:
try:
callback = cb_map[fd]
def initialize(self) -> None:
self.io_loop = IOLoop.current()
self.channel = pycares.Channel(sock_state_cb=self._sock_state_cb)
- self.fds: Dict[int, int] = {}
+ self.fds: dict[int, int] = {}
def _sock_state_cb(self, fd: int, readable: bool, writable: bool) -> None:
state = (IOLoop.READ if readable else 0) | (IOLoop.WRITE if writable else 0)
addresses = [host]
else:
# gethostbyname doesn't take callback as a kwarg
- fut: Future[Tuple[Any, Any]] = Future()
+ fut: Future[tuple[Any, Any]] = Future()
self.channel.gethostbyname(
host, family, lambda result, error: fut.set_result((result, error))
)
self.io_loop = ioloop.IOLoop.current()
# All FDs we create should be closed on error; those in to_close
# should be closed in the parent process on success.
- pipe_fds: List[int] = []
- to_close: List[int] = []
+ pipe_fds: list[int] = []
+ to_close: list[int] = []
if kwargs.get("stdin") is Subprocess.STREAM:
in_r, in_w = os.pipe()
kwargs["stdin"] = in_r
Union,
Optional,
Awaitable,
- List,
- Dict,
Pattern,
- Tuple,
overload,
Sequence,
)
_RuleList = Sequence[
Union[
"Rule",
- List[Any], # Can't do detailed typechecking of lists.
- Tuple[Union[str, "Matcher"], Any],
- Tuple[Union[str, "Matcher"], Any, Dict[str, Any]],
- Tuple[Union[str, "Matcher"], Any, Dict[str, Any], str],
+ list[Any], # Can't do detailed typechecking of lists.
+ tuple[Union[str, "Matcher"], Any],
+ tuple[Union[str, "Matcher"], Any, dict[str, Any]],
+ tuple[Union[str, "Matcher"], Any, dict[str, Any], str],
]
]
:arg rules: a list of `Rule` instances or tuples of `Rule`
constructor arguments.
"""
- self.rules: List[Rule] = []
+ self.rules: list[Rule] = []
if rules:
self.add_rules(rules)
"""
def __init__(self, rules: Optional[_RuleList] = None) -> None:
- self.named_rules: Dict[str, Any] = {}
+ self.named_rules: dict[str, Any] = {}
super().__init__(rules)
def process_rule(self, rule: "Rule") -> "Rule":
self,
matcher: "Matcher",
target: Any,
- target_kwargs: Optional[Dict[str, Any]] = None,
+ target_kwargs: Optional[dict[str, Any]] = None,
name: Optional[str] = None,
) -> None:
"""Constructs a Rule instance.
class Matcher:
"""Represents a matcher for request features."""
- def match(self, request: httputil.HTTPServerRequest) -> Optional[Dict[str, Any]]:
+ def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]:
"""Matches current instance against the request.
:arg httputil.HTTPServerRequest request: current HTTP request
class AnyMatches(Matcher):
"""Matches any request."""
- def match(self, request: httputil.HTTPServerRequest) -> Optional[Dict[str, Any]]:
+ def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]:
return {}
else:
self.host_pattern = host_pattern
- def match(self, request: httputil.HTTPServerRequest) -> Optional[Dict[str, Any]]:
+ def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]:
if self.host_pattern.match(request.host_name):
return {}
self.application = application
self.host_pattern = host_pattern
- def match(self, request: httputil.HTTPServerRequest) -> Optional[Dict[str, Any]]:
+ def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]:
# Look for default host if not behind load balancer (for debugging)
if "X-Real-Ip" not in request.headers:
if self.host_pattern.match(self.application.default_host):
self._path, self._group_count = self._find_groups()
- def match(self, request: httputil.HTTPServerRequest) -> Optional[Dict[str, Any]]:
+ def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]:
match = self.regex.match(request.path)
if match is None:
return None
if not self.regex.groups:
return {}
- path_args: List[bytes] = []
- path_kwargs: Dict[str, bytes] = {}
+ path_args: list[bytes] = []
+ path_kwargs: dict[str, bytes] = {}
# Pass matched groups to the handler. Since
# match.groups() includes both named and
converted_args.append(url_escape(utf8(a), plus=False))
return self._path % tuple(converted_args)
- def _find_groups(self) -> Tuple[Optional[str], Optional[int]]:
+ def _find_groups(self) -> tuple[Optional[str], Optional[int]]:
"""Returns a tuple (reverse string, group count) for a url.
For example: Given the url pattern /([0-9]{4})/([a-z-]+)/, this method
self,
pattern: Union[str, Pattern],
handler: Any,
- kwargs: Optional[Dict[str, Any]] = None,
+ kwargs: Optional[dict[str, Any]] = None,
name: Optional[str] = None,
) -> None:
"""Parameters:
from io import BytesIO
import urllib.parse
-from typing import Dict, Any, Callable, Optional, Type, Union, Awaitable
+from typing import Any, Callable, Optional, Type, Union, Awaitable
from types import TracebackType
import typing
def initialize( # type: ignore
self,
max_clients: int = 10,
- hostname_mapping: Optional[Dict[str, str]] = None,
+ hostname_mapping: Optional[dict[str, str]] = None,
max_buffer_size: int = 104857600,
resolver: Optional[Resolver] = None,
- defaults: Optional[Dict[str, Any]] = None,
+ defaults: Optional[dict[str, Any]] = None,
max_header_size: Optional[int] = None,
max_body_size: Optional[int] = None,
) -> None:
super().initialize(defaults=defaults)
self.max_clients = max_clients
self.queue: Deque[
- Tuple[object, HTTPRequest, Callable[[HTTPResponse], None]]
+ tuple[object, HTTPRequest, Callable[[HTTPResponse], None]]
] = collections.deque()
- self.active: Dict[
- object, Tuple[HTTPRequest, Callable[[HTTPResponse], None]]
+ self.active: dict[
+ object, tuple[HTTPRequest, Callable[[HTTPResponse], None]]
] = {}
- self.waiting: Dict[
- object, Tuple[HTTPRequest, Callable[[HTTPResponse], None], object]
+ self.waiting: dict[
+ object, tuple[HTTPRequest, Callable[[HTTPResponse], None], object]
] = {}
self.max_buffer_size = max_buffer_size
self.max_header_size = max_header_size
self.max_body_size = max_body_size
self.code: Optional[int] = None
self.headers: Optional[httputil.HTTPHeaders] = None
- self.chunks: List[bytes] = []
+ self.chunks: list[bytes] = []
self._decompressor = None
# Timeout handle returned by IOLoop.add_timeout
self._timeout: object = None
def _get_ssl_options(
self, scheme: str
- ) -> Union[None, Dict[str, Any], ssl.SSLContext]:
+ ) -> Union[None, dict[str, Any], ssl.SSLContext]:
if scheme == "https":
if self.request.ssl_options is not None:
return self.request.ssl_options
from tornado.netutil import Resolver
from tornado.gen import TimeoutError
-from typing import Any, Union, Dict, Tuple, List, Callable, Iterator, Optional
+from typing import Any, Union, Tuple, Callable, Iterator, Optional
if typing.TYPE_CHECKING:
from typing import Set # noqa(F401)
def __init__(
self,
- addrinfo: List[Tuple],
+ addrinfo: list[tuple],
connect: Callable[
- [socket.AddressFamily, Tuple], Tuple[IOStream, "Future[IOStream]"]
+ [socket.AddressFamily, tuple], tuple[IOStream, "Future[IOStream]"]
],
) -> None:
self.io_loop = IOLoop.current()
self.connect = connect
- self.future: Future[Tuple[socket.AddressFamily, Any, IOStream]] = Future()
+ self.future: Future[tuple[socket.AddressFamily, Any, IOStream]] = Future()
self.timeout: Optional[object] = None
self.connect_timeout: Optional[object] = None
self.last_error: Optional[Exception] = None
self.remaining = len(addrinfo)
self.primary_addrs, self.secondary_addrs = self.split(addrinfo)
- self.streams: Set[IOStream] = set()
+ self.streams: set[IOStream] = set()
@staticmethod
def split(
- addrinfo: List[Tuple],
- ) -> Tuple[
- List[Tuple[socket.AddressFamily, Tuple]],
- List[Tuple[socket.AddressFamily, Tuple]],
+ addrinfo: list[tuple],
+ ) -> tuple[
+ list[tuple[socket.AddressFamily, tuple]],
+ list[tuple[socket.AddressFamily, tuple]],
]:
"""Partition the ``addrinfo`` list by address family.
self.set_connect_timeout(connect_timeout)
return self.future
- def try_connect(self, addrs: Iterator[Tuple[socket.AddressFamily, Tuple]]) -> None:
+ def try_connect(self, addrs: Iterator[tuple[socket.AddressFamily, tuple]]) -> None:
try:
af, addr = next(addrs)
except StopIteration:
def on_connect_done(
self,
- addrs: Iterator[Tuple[socket.AddressFamily, Tuple]],
+ addrs: Iterator[tuple[socket.AddressFamily, tuple]],
af: socket.AddressFamily,
- addr: Tuple,
+ addr: tuple,
future: "Future[IOStream]",
) -> None:
self.remaining -= 1
host: str,
port: int,
af: socket.AddressFamily = socket.AF_UNSPEC,
- ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
+ ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None,
max_buffer_size: Optional[int] = None,
source_ip: Optional[str] = None,
source_port: Optional[int] = None,
self,
max_buffer_size: Optional[int],
af: socket.AddressFamily,
- addr: Tuple,
+ addr: tuple,
source_ip: Optional[str] = None,
source_port: Optional[int] = None,
- ) -> Tuple[IOStream, "Future[IOStream]"]:
+ ) -> tuple[IOStream, "Future[IOStream]"]:
# Always connect in plaintext; we'll convert to ssl if necessary
# after one connection has completed.
source_port_bind = source_port if isinstance(source_port, int) else 0
from tornado.util import errno_from_exception
import typing
-from typing import Union, Dict, Any, Iterable, Optional, Awaitable
+from typing import Union, Any, Iterable, Optional, Awaitable
if typing.TYPE_CHECKING:
from typing import Callable, List # noqa: F401
def __init__(
self,
- ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
+ ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None,
max_buffer_size: Optional[int] = None,
read_chunk_size: Optional[int] = None,
) -> None:
self.ssl_options = ssl_options
- self._sockets: Dict[int, socket.socket] = {}
- self._handlers: Dict[int, Callable[[], None]] = {}
- self._pending_sockets: List[socket.socket] = []
+ self._sockets: dict[int, socket.socket] = {}
+ self._handlers: dict[int, Callable[[], None]] = {}
+ self._pending_sockets: list[socket.socket] = []
self._started = False
self._stopped = False
self.max_buffer_size = max_buffer_size
from tornado.log import app_log
from tornado.util import ObjectDict, exec_in, unicode_type
-from typing import Any, Union, Callable, List, Dict, Iterable, Optional, TextIO
+from typing import Any, Union, Callable, Iterable, Optional, TextIO
import typing
if typing.TYPE_CHECKING:
buffer = StringIO()
try:
# named_blocks maps from names to _NamedBlock objects
- named_blocks: Dict[str, _NamedBlock] = {}
+ named_blocks: dict[str, _NamedBlock] = {}
ancestors = self._get_ancestors(loader)
ancestors.reverse()
for ancestor in ancestors:
finally:
buffer.close()
- def _get_ancestors(self, loader: Optional["BaseLoader"]) -> List["_File"]:
+ def _get_ancestors(self, loader: Optional["BaseLoader"]) -> list["_File"]:
ancestors = [self.file]
for chunk in self.file.body.chunks:
if isinstance(chunk, _ExtendsBlock):
def __init__(
self,
autoescape: Optional[str] = _DEFAULT_AUTOESCAPE,
- namespace: Optional[Dict[str, Any]] = None,
+ namespace: Optional[dict[str, Any]] = None,
whitespace: Optional[str] = None,
) -> None:
"""Construct a template loader.
self.autoescape = autoescape
self.namespace = namespace or {}
self.whitespace = whitespace
- self.templates: Dict[str, Template] = {}
+ self.templates: dict[str, Template] = {}
# self.lock protects self.templates. It's a reentrant lock
# because templates may load other templates via `include` or
# `extends`. Note that thanks to the GIL this code would be safe
class DictLoader(BaseLoader):
"""A template loader that loads from a dictionary."""
- def __init__(self, dict: Dict[str, str], **kwargs: Any) -> None:
+ def __init__(self, dict: dict[str, str], **kwargs: Any) -> None:
super().__init__(**kwargs)
self.dict = dict
raise NotImplementedError()
def find_named_blocks(
- self, loader: Optional[BaseLoader], named_blocks: Dict[str, "_NamedBlock"]
+ self, loader: Optional[BaseLoader], named_blocks: dict[str, "_NamedBlock"]
) -> None:
for child in self.each_child():
child.find_named_blocks(loader, named_blocks)
class _ChunkList(_Node):
- def __init__(self, chunks: List[_Node]) -> None:
+ def __init__(self, chunks: list[_Node]) -> None:
self.chunks = chunks
def generate(self, writer: "_CodeWriter") -> None:
block.body.generate(writer)
def find_named_blocks(
- self, loader: Optional[BaseLoader], named_blocks: Dict[str, "_NamedBlock"]
+ self, loader: Optional[BaseLoader], named_blocks: dict[str, "_NamedBlock"]
) -> None:
named_blocks[self.name] = self
_Node.find_named_blocks(self, loader, named_blocks)
self.line = line
def find_named_blocks(
- self, loader: Optional[BaseLoader], named_blocks: Dict[str, _NamedBlock]
+ self, loader: Optional[BaseLoader], named_blocks: dict[str, _NamedBlock]
) -> None:
assert loader is not None
included = loader.load(self.name, self.template_name)
def __init__(
self,
file: TextIO,
- named_blocks: Dict[str, _NamedBlock],
+ named_blocks: dict[str, _NamedBlock],
loader: Optional[BaseLoader],
current_template: Template,
) -> None:
self.loader = loader
self.current_template = current_template
self.apply_counter = 0
- self.include_stack: List[Tuple[Template, int]] = []
+ self.include_stack: list[tuple[Template, int]] = []
self._indent = 0
def indent_size(self) -> int:
stack_ids.remove(item_id)
visited_ids.add(item_id)
- found: typing.List[object] = []
+ found: list[object] = []
stack = []
stack_ids = set()
garbage_ids = set(map(id, garbage))
from typing import List, Tuple, Union, Dict, Any # noqa: F401
-linkify_tests: List[Tuple[Union[str, bytes], Dict[str, Any], str]] = [
+linkify_tests: list[tuple[Union[str, bytes], dict[str, Any], str]] = [
# (input, linkify_kwargs, expected_output)
(
"hello http://world.com/!",
self.assertEqual(linked, html)
def test_xhtml_escape(self):
- tests: List[Tuple[Union[str, bytes], Union[str, bytes]]] = [
+ tests: list[tuple[Union[str, bytes], Union[str, bytes]]] = [
("<foo>", "<foo>"),
("<foo>", "<foo>"),
(b"<foo>", b"<foo>"),
self.assertEqual(unescaped, xhtml_unescape(escaped))
def test_url_escape_unicode(self):
- tests: List[Tuple[Union[str, bytes], str]] = [
+ tests: list[tuple[Union[str, bytes], str]] = [
# byte strings are passed through as-is
("\u00e9".encode(), "%C3%A9"),
("\u00e9".encode("latin1"), "%E9"),
class UndecoratedCoroutinesHandler(RequestHandler):
@gen.coroutine
def prepare(self):
- self.chunks: List[str] = []
+ self.chunks: list[str] = []
yield gen.moment
self.chunks.append("1")
@gen_test
def test_iterator(self):
- futures: List[Future[int]] = [Future(), Future(), Future(), Future()]
+ futures: list[Future[int]] = [Future(), Future(), Future(), Future()]
self.finish_coroutines(0, futures)
# Recreate the previous test with py35 syntax. It's a little clunky
# because of the way the previous test handles an exception on
# a single iteration.
- futures: List[Future[int]] = [Future(), Future(), Future(), Future()]
+ futures: list[Future[int]] = [Future(), Future(), Future(), Future()]
self.finish_coroutines(0, futures)
self.finished = False
def test_gc(self):
# GitHub issue 1769: Runner objects can get GCed unexpectedly
# while their future is alive.
- weakref_scope: List[Optional[weakref.ReferenceType]] = [None]
+ weakref_scope: list[Optional[weakref.ReferenceType]] = [None]
def callback():
gc.collect(2)
# their loop is closed, even if they're involved in a reference
# cycle.
loop = self.get_new_ioloop()
- result: List[Optional[bool]] = []
+ result: list[Optional[bool]] = []
wfut = []
@gen.coroutine
result.append(None)
loop = self.get_new_ioloop()
- result: List[Optional[bool]] = []
+ result: list[Optional[bool]] = []
wfut = []
@gen.coroutine
def test_streaming_callback(self):
# streaming_callback is also tested in test_chunked
- chunks: typing.List[bytes] = []
+ chunks: list[bytes] = []
response = self.fetch("/hello", streaming_callback=chunks.append)
# with streaming_callback, data goes to the callback and not response.body
self.assertEqual(chunks, [b"Hello world!"])
response = self.fetch("/chunk")
self.assertEqual(response.body, b"asdfqwer")
- chunks: typing.List[bytes] = []
+ chunks: list[bytes] = []
response = self.fetch("/chunk", streaming_callback=chunks.append)
self.assertEqual(chunks, [b"asdf", b"qwer"])
self.assertFalse(response.body)
class TypeCheckHandler(RequestHandler):
def prepare(self):
- self.errors: Dict[str, str] = {}
+ self.errors: dict[str, str] = {}
fields = [
("method", str),
("uri", str),
self.connection = connection
def headers_received(self, start_line, headers):
- self.chunk_lengths: List[int] = []
+ self.chunk_lengths: list[int] = []
def data_received(self, chunk):
self.chunk_lengths.append(len(chunk))
def setUp(self):
super().setUp()
- self.streams: List[IOStream] = []
+ self.streams: list[IOStream] = []
def tearDown(self):
super().tearDown()
import urllib.parse
import unittest
-from typing import Tuple, Dict, List
-
-def form_data_args() -> Tuple[Dict[str, List[bytes]], Dict[str, List[HTTPFile]]]:
+def form_data_args() -> tuple[dict[str, list[bytes]], dict[str, list[HTTPFile]]]:
"""Return two empty dicts suitable for use with parse_multipart_form_data.
mypy insists on type annotations for dict literals, so this lets us avoid
def test_timeout_with_arguments(self):
# This tests that all the timeout methods pass through *args correctly.
- results: List[int] = []
+ results: list[int] = []
self.io_loop.add_timeout(self.io_loop.time(), results.append, 1)
self.io_loop.add_timeout(datetime.timedelta(seconds=0), results.append, 2)
self.io_loop.call_at(self.io_loop.time(), results.append, 3)
class ConditionTest(AsyncTestCase):
def setUp(self):
super().setUp()
- self.history: typing.List[typing.Union[int, str]] = []
+ self.history: list[typing.Union[int, str]] = []
def record_done(self, future, key):
"""Record the resolution of a Future returned by Condition.wait."""
not hasattr(socket, "SO_REUSEPORT"), "SO_REUSEPORT is not supported"
)
def test_reuse_port(self):
- sockets: typing.List[socket.socket] = []
+ sockets: list[socket.socket] = []
sock, port = bind_unused_port(reuse_port=True)
try:
sockets = bind_sockets(port, "127.0.0.1", reuse_port=True)
options.foo = "2"
def test_setattr_with_callback(self):
- values: List[int] = []
+ values: list[int] = []
options = OptionParser()
options.define("foo", default=1, type=int, callback=values.append)
options.foo = 2
self.assertEqual(response.body, b"OK")
-resources: typing.Dict[str, bytes] = {}
+resources: dict[str, bytes] = {}
class GetResource(RequestHandler):
class CustomRouter(ReversibleRouter):
def __init__(self):
super().__init__()
- self.routes: typing.Dict[str, typing.Any] = {}
+ self.routes: dict[str, typing.Any] = {}
def add_routes(self, routes):
self.routes.update(routes)
# simple_httpclient_test, but it fails with the version of libcurl
# available on travis-ci. Move it when that has been upgraded
# or we have a better framework to skip tests based on curl version.
- headers: typing.List[str] = []
- chunk_bytes: typing.List[bytes] = []
+ headers: list[str] = []
+ chunk_bytes: list[bytes] = []
self.fetch(
"/redirect?url=/hello",
header_callback=headers.append,
self.assertEqual(num_start_lines, 1)
def test_streaming_callback_coroutine(self: typing.Any):
- headers: typing.List[str] = []
- chunk_bytes: typing.List[bytes] = []
+ headers: list[str] = []
+ chunk_bytes: list[bytes] = []
import asyncio
class TestTCPServer(TCPServer):
def __init__(self, family):
super().__init__()
- self.streams: List[IOStream] = []
+ self.streams: list[IOStream] = []
self.queue: Queue[IOStream] = Queue()
sockets = bind_sockets(0, "localhost", family)
self.add_sockets(sockets)
def setUp(self):
super().setUp()
- self.connect_futures: Dict[
- Tuple[int, typing.Any], Future[ConnectorTest.FakeStream]
+ self.connect_futures: dict[
+ tuple[int, typing.Any], Future[ConnectorTest.FakeStream]
] = {}
- self.streams: Dict[typing.Any, ConnectorTest.FakeStream] = {}
+ self.streams: dict[typing.Any, ConnectorTest.FakeStream] = {}
self.addrinfo = [(AF1, "a"), (AF1, "b"), (AF2, "c"), (AF2, "d")]
def tearDown(self):
from tornado.test.util import skipIfNonUnix
from tornado.testing import AsyncTestCase, ExpectLog, bind_unused_port, gen_test
-from typing import Tuple
-
class TCPServerTest(AsyncTestCase):
@gen_test
# processes, each of which prints its task id to stdout (a single
# byte, so we don't have to worry about atomicity of the shared
# stdout stream) and then exits.
- def run_subproc(self, code: str) -> Tuple[str, str]:
+ def run_subproc(self, code: str) -> tuple[str, str]:
try:
result = subprocess.run(
[sys.executable, "-Werror::DeprecationWarning"],
from tornado.testing import bind_unused_port
-_TestCaseType = typing.TypeVar("_TestCaseType", bound=typing.Type[unittest.TestCase])
+_TestCaseType = typing.TypeVar("_TestCaseType", bound=type[unittest.TestCase])
skipIfNonUnix = unittest.skipIf(
os.name != "posix" or sys.platform == "cygwin", "non-unix platform"
# globals: it's all global from the perspective of code defined
# in s.
global_namespace = dict(caller_globals, **caller_locals) # type: ignore
- local_namespace: typing.Dict[str, typing.Any] = {}
+ local_namespace: dict[str, typing.Any] = {}
exec(textwrap.dedent(s), global_namespace, local_namespace)
return local_namespace
re_unescape,
)
-from typing import cast, Dict, Any
+from typing import cast, Any
class RaiseExcInfoTest(unittest.TestCase):
def test_omitted(self):
args = (1, 2)
- kwargs: Dict[str, Any] = dict()
+ kwargs: dict[str, Any] = dict()
self.assertIsNone(self.replacer.get_old_value(args, kwargs))
self.assertEqual(
self.replacer.replace("new", args, kwargs),
def test_position(self):
args = (1, 2, "old", 3)
- kwargs: Dict[str, Any] = dict()
+ kwargs: dict[str, Any] = dict()
self.assertEqual(self.replacer.get_old_value(args, kwargs), "old")
self.assertEqual(
self.replacer.replace("new", args, kwargs),
# stub out enough methods to make the signed_cookie functions work
def __init__(self, cookie_secret="0123456789", key_version=None):
# don't call super.__init__
- self._cookies: typing.Dict[str, bytes] = {}
+ self._cookies: dict[str, bytes] = {}
if key_version is None:
self.application = ObjectDict( # type: ignore
settings=dict(cookie_secret=cookie_secret)
class TypeCheckHandler(RequestHandler):
def prepare(self):
- self.errors: typing.Dict[str, str] = {}
+ self.errors: dict[str, str] = {}
self.check_type("status", self.get_status(), int)
def initialize(self, test):
self.test = test
self.method = None
- self.methods: typing.List[str] = []
+ self.methods: list[str] = []
@contextlib.contextmanager
def in_method(self, method):
from tornado.web import Application
import typing
-from typing import Tuple, Any, Callable, Type, Dict, Union, Optional, Coroutine
+from typing import Any, Callable, Type, Union, Optional, Coroutine
from types import TracebackType
if typing.TYPE_CHECKING:
- _ExcInfoTuple = Tuple[
- Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]
+ _ExcInfoTuple = tuple[
+ Optional[type[BaseException]], Optional[BaseException], Optional[TracebackType]
]
def bind_unused_port(
reuse_port: bool = False, address: str = "127.0.0.1"
-) -> Tuple[socket.socket, int]:
+) -> tuple[socket.socket, int]:
"""Binds a server socket to an available port on localhost.
Returns a tuple (socket, port).
return IOLoop(make_current=False)
def _handle_exception(
- self, typ: Type[Exception], value: Exception, tb: TracebackType
+ self, typ: type[Exception], value: Exception, tb: TracebackType
) -> bool:
if self.__failure is None:
self.__failure = (typ, value, tb)
timeout=get_async_test_timeout(),
)
- def get_httpserver_options(self) -> Dict[str, Any]:
+ def get_httpserver_options(self) -> dict[str, Any]:
"""May be overridden by subclasses to return additional
keyword arguments for the server.
"""
def get_http_client(self) -> AsyncHTTPClient:
return AsyncHTTPClient(force_instance=True, defaults=dict(validate_cert=False))
- def get_httpserver_options(self) -> Dict[str, Any]:
+ def get_httpserver_options(self) -> dict[str, Any]:
return dict(ssl_options=self.get_ssl_options())
- def get_ssl_options(self) -> Dict[str, Any]:
+ def get_ssl_options(self) -> dict[str, Any]:
"""May be overridden by subclasses to select SSL options.
By default includes a self-signed testing certificate.
return AsyncHTTPSTestCase.default_ssl_options()
@staticmethod
- def default_ssl_options() -> Dict[str, Any]:
+ def default_ssl_options() -> dict[str, Any]:
# Testing keys were generated with:
# openssl req -new -keyout tornado/test/test.key \
# -out tornado/test/test.crt \
from typing import (
Any,
- Dict,
Mapping,
Match,
Callable,
- Type,
Sequence,
)
TimeoutError = asyncio.TimeoutError
-class ObjectDict(Dict[str, Any]):
+class ObjectDict(dict[str, Any]):
"""Makes a dictionary behave like an object, with attribute-style access."""
def __getattr__(self, name: str) -> Any:
"""
base = cls.configurable_base()
if isinstance(impl, str):
- impl = typing.cast(Type[Configurable], import_object(impl))
+ impl = typing.cast(type[Configurable], import_object(impl))
if impl is not None and not issubclass(impl, cls):
raise ValueError("Invalid subclass of %s" % cls)
base.__impl_class = impl
url = URLSpec
from typing import (
- Dict,
Any,
Union,
Optional,
Awaitable,
- Tuple,
- List,
Callable,
Iterable,
Generator,
# and related methods.
_HeaderTypes = Union[bytes, unicode_type, int, numbers.Integral, datetime.datetime]
-_CookieSecretTypes = Union[str, bytes, Dict[int, str], Dict[int, bytes]]
+_CookieSecretTypes = Union[str, bytes, dict[int, str], dict[int, bytes]]
MIN_SUPPORTED_SIGNED_VALUE_VERSION = 1
"""
- SUPPORTED_METHODS: Tuple[str, ...] = (
+ SUPPORTED_METHODS: tuple[str, ...] = (
"GET",
"HEAD",
"POST",
"OPTIONS",
)
- _template_loaders: Dict[str, template.BaseLoader] = {}
+ _template_loaders: dict[str, template.BaseLoader] = {}
_template_loader_lock = threading.Lock()
_remove_control_chars_regex = re.compile(r"[\x00-\x08\x0e-\x1f]")
_stream_request_body = False
# Will be set in _execute.
- _transforms: List["OutputTransform"]
- path_args: List[str]
- path_kwargs: Dict[str, str]
+ _transforms: list["OutputTransform"]
+ path_args: list[str]
+ path_kwargs: dict[str, str]
def __init__(
self,
"""
@property
- def settings(self) -> Dict[str, Any]:
+ def settings(self) -> dict[str, Any]:
"""An alias for `self.application.settings <Application.settings>`."""
return self.application.settings
}
)
self.set_default_headers()
- self._write_buffer: List[bytes] = []
+ self._write_buffer: list[bytes] = []
self._status_code = 200
self._reason = httputil.responses[200]
"""
return self._get_argument(name, default, self.request.arguments, strip)
- def get_arguments(self, name: str, strip: bool = True) -> List[str]:
+ def get_arguments(self, name: str, strip: bool = True) -> list[str]:
"""Returns a list of the arguments with the given name.
If the argument is not present, returns an empty list.
"""
return self._get_argument(name, default, self.request.body_arguments, strip)
- def get_body_arguments(self, name: str, strip: bool = True) -> List[str]:
+ def get_body_arguments(self, name: str, strip: bool = True) -> list[str]:
"""Returns a list of the body arguments with the given name.
If the argument is not present, returns an empty list.
"""
return self._get_argument(name, default, self.request.query_arguments, strip)
- def get_query_arguments(self, name: str, strip: bool = True) -> List[str]:
+ def get_query_arguments(self, name: str, strip: bool = True) -> list[str]:
"""Returns a list of the query arguments with the given name.
If the argument is not present, returns an empty list.
self,
name: str,
default: Union[None, str, _ArgDefaultMarker],
- source: Dict[str, List[bytes]],
+ source: dict[str, list[bytes]],
strip: bool = True,
) -> Optional[str]:
args = self._get_arguments(name, source, strip=strip)
return args[-1]
def _get_arguments(
- self, name: str, source: Dict[str, List[bytes]], strip: bool = True
- ) -> List[str]:
+ self, name: str, source: dict[str, list[bytes]], strip: bool = True
+ ) -> list[str]:
values = []
for v in source.get(name, []):
s = self.decode_argument(v, name=name)
)
@property
- def cookies(self) -> Dict[str, http.cookies.Morsel]:
+ def cookies(self) -> dict[str, http.cookies.Morsel]:
"""An alias for
`self.request.cookies <.httputil.HTTPServerRequest.cookies>`."""
return self.request.cookies
name: str,
value: Union[str, bytes],
domain: Optional[str] = None,
- expires: Optional[Union[float, Tuple, datetime.datetime]] = None,
+ expires: Optional[Union[float, tuple, datetime.datetime]] = None,
path: str = "/",
expires_days: Optional[float] = None,
# Keyword-only args start here for historical reasons.
Override this method in a sub-classed controller to change the output.
"""
paths = []
- unique_paths: Set[str] = set()
+ unique_paths: set[str] = set()
for path in js_files:
if not is_absolute(path):
Override this method in a sub-classed controller to change the output.
"""
paths = []
- unique_paths: Set[str] = set()
+ unique_paths: set[str] = set()
for path in css_files:
if not is_absolute(path):
namespace.update(kwargs)
return t.generate(**namespace)
- def get_template_namespace(self) -> Dict[str, Any]:
+ def get_template_namespace(self) -> dict[str, Any]:
"""Returns a dictionary to be used as the default template namespace.
May be overridden by subclasses to add or modify values.
self.set_cookie(cookie_name, self._xsrf_token, **cookie_kwargs)
return self._xsrf_token
- def _get_raw_xsrf_token(self) -> Tuple[Optional[int], bytes, float]:
+ def _get_raw_xsrf_token(self) -> tuple[Optional[int], bytes, float]:
"""Read or generate the xsrf token in its raw form.
The raw_xsrf_token is a tuple containing:
def _decode_xsrf_token(
self, cookie: str
- ) -> Tuple[Optional[int], Optional[bytes], Optional[float]]:
+ ) -> tuple[Optional[int], Optional[bytes], Optional[float]]:
"""Convert a cookie string into a the tuple form returned by
_get_raw_xsrf_token.
"""
return match
async def _execute(
- self, transforms: List["OutputTransform"], *args: bytes, **kwargs: bytes
+ self, transforms: list["OutputTransform"], *args: bytes, **kwargs: bytes
) -> None:
"""Executes this request with the given output transforms."""
self._transforms = transforms
exc_info=(typ, value, tb), # type: ignore
)
- def _ui_module(self, name: str, module: Type["UIModule"]) -> Callable[..., str]:
+ def _ui_module(self, name: str, module: type["UIModule"]) -> Callable[..., str]:
def render(*args, **kwargs) -> str: # type: ignore
if not hasattr(self, "_active_modules"):
- self._active_modules: Dict[str, UIModule] = {}
+ self._active_modules: dict[str, UIModule] = {}
if name not in self._active_modules:
self._active_modules[name] = module(self)
rendered = self._active_modules[name].render(*args, **kwargs)
_RequestHandlerType = TypeVar("_RequestHandlerType", bound=RequestHandler)
-def stream_request_body(cls: Type[_RequestHandlerType]) -> Type[_RequestHandlerType]:
+def stream_request_body(cls: type[_RequestHandlerType]) -> type[_RequestHandlerType]:
"""Apply to `RequestHandler` subclasses to enable streaming body support.
This decorator implies the following changes:
return cls
-def _has_stream_request_body(cls: Type[RequestHandler]) -> bool:
+def _has_stream_request_body(cls: type[RequestHandler]) -> bool:
if not issubclass(cls, RequestHandler):
raise TypeError("expected subclass of RequestHandler, got %r", cls)
return cls._stream_request_body
self,
handlers: Optional[_RuleList] = None,
default_host: Optional[str] = None,
- transforms: Optional[List[Type["OutputTransform"]]] = None,
+ transforms: Optional[list[type["OutputTransform"]]] = None,
**settings: Any,
) -> None:
if transforms is None:
- self.transforms: List[Type[OutputTransform]] = []
+ self.transforms: list[type[OutputTransform]] = []
if settings.get("compress_response") or settings.get("gzip"):
self.transforms.append(GZipContentEncoding)
else:
"xsrf_form_html": _xsrf_form_html,
"Template": TemplateModule,
}
- self.ui_methods: Dict[str, Callable[..., str]] = {}
+ self.ui_methods: dict[str, Callable[..., str]] = {}
self._load_ui_modules(settings.get("ui_modules", {}))
self._load_ui_methods(settings.get("ui_methods", {}))
if self.settings.get("static_path"):
[(DefaultHostMatches(self, host_matcher.host_pattern), host_handlers)]
)
- def add_transform(self, transform_class: Type["OutputTransform"]) -> None:
+ def add_transform(self, transform_class: type["OutputTransform"]) -> None:
self.transforms.append(transform_class)
def _load_ui_methods(self, methods: Any) -> None:
def get_handler_delegate(
self,
request: httputil.HTTPServerRequest,
- target_class: Type[RequestHandler],
- target_kwargs: Optional[Dict[str, Any]] = None,
- path_args: Optional[List[bytes]] = None,
- path_kwargs: Optional[Dict[str, bytes]] = None,
+ target_class: type[RequestHandler],
+ target_kwargs: Optional[dict[str, Any]] = None,
+ path_args: Optional[list[bytes]] = None,
+ path_kwargs: Optional[dict[str, bytes]] = None,
) -> "_HandlerDelegate":
"""Returns `~.httputil.HTTPMessageDelegate` that can serve a request
for application and `RequestHandler` subclass.
self,
application: Application,
request: httputil.HTTPServerRequest,
- handler_class: Type[RequestHandler],
- handler_kwargs: Optional[Dict[str, Any]],
- path_args: Optional[List[bytes]],
- path_kwargs: Optional[Dict[str, bytes]],
+ handler_class: type[RequestHandler],
+ handler_kwargs: Optional[dict[str, Any]],
+ path_args: Optional[list[bytes]],
+ path_kwargs: Optional[dict[str, bytes]],
) -> None:
self.application = application
self.connection = request.connection
self.handler_kwargs = handler_kwargs or {}
self.path_args = path_args or []
self.path_kwargs = path_kwargs or {}
- self.chunks: List[bytes] = []
+ self.chunks: list[bytes] = []
self.stream_request_body = _has_stream_request_body(self.handler_class)
def headers_received(
CACHE_MAX_AGE = 86400 * 365 * 10 # 10 years
- _static_hashes: Dict[str, Optional[str]] = {}
+ _static_hashes: dict[str, Optional[str]] = {}
_lock = threading.Lock() # protects _static_hashes
def initialize(self, path: str, default_filename: Optional[str] = None) -> None:
@classmethod
def make_static_url(
- cls, settings: Dict[str, Any], path: str, include_version: bool = True
+ cls, settings: dict[str, Any], path: str, include_version: bool = True
) -> str:
"""Constructs a versioned url for the given path.
return url_path
@classmethod
- def get_version(cls, settings: Dict[str, Any], path: str) -> Optional[str]:
+ def get_version(cls, settings: dict[str, Any], path: str) -> Optional[str]:
"""Generate the version string to be used in static URLs.
``settings`` is the `Application.settings` dictionary and ``path``
headers: httputil.HTTPHeaders,
chunk: bytes,
finishing: bool,
- ) -> Tuple[int, httputil.HTTPHeaders, bytes]:
+ ) -> tuple[int, httputil.HTTPHeaders, bytes]:
return status_code, headers, chunk
def transform_chunk(self, chunk: bytes, finishing: bool) -> bytes:
headers: httputil.HTTPHeaders,
chunk: bytes,
finishing: bool,
- ) -> Tuple[int, httputil.HTTPHeaders, bytes]:
+ ) -> tuple[int, httputil.HTTPHeaders, bytes]:
# TODO: can/should this type be inherited from the superclass?
if "Vary" in headers:
headers["Vary"] += ", Accept-Encoding"
def __init__(self, handler: RequestHandler) -> None:
super().__init__(handler)
# keep resources in both a list and a dict to preserve order
- self._resource_list: List[Dict[str, Any]] = []
- self._resource_dict: Dict[str, Dict[str, Any]] = {}
+ self._resource_list: list[dict[str, Any]] = []
+ self._resource_dict: dict[str, dict[str, Any]] = {}
def render(self, path: str, **kwargs: Any) -> bytes:
def set_resources(**kwargs) -> str: # type: ignore
"""Lazy namespace which creates UIModule proxies bound to a handler."""
def __init__(
- self, handler: RequestHandler, ui_modules: Dict[str, Type[UIModule]]
+ self, handler: RequestHandler, ui_modules: dict[str, type[UIModule]]
) -> None:
self.handler = handler
self.ui_modules = ui_modules
return None
-def _decode_fields_v2(value: bytes) -> Tuple[int, bytes, bytes, bytes, bytes]:
- def _consume_field(s: bytes) -> Tuple[bytes, bytes]:
+def _decode_fields_v2(value: bytes) -> tuple[int, bytes, bytes, bytes, bytes]:
+ def _consume_field(s: bytes) -> tuple[bytes, bytes]:
length, _, rest = s.partition(b":")
n = int(length)
field_value = rest[:n]
cast,
Any,
Optional,
- Dict,
Union,
- List,
Awaitable,
Callable,
- Tuple,
Type,
)
from types import TracebackType
def log_exception(
self,
- typ: Optional[Type[BaseException]],
+ typ: Optional[type[BaseException]],
value: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
ping_interval: Optional[float] = None,
ping_timeout: Optional[float] = None,
max_message_size: int = _default_max_message_size,
- compression_options: Optional[Dict[str, Any]] = None,
+ compression_options: Optional[dict[str, Any]] = None,
) -> None:
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
)
def write_message(
- self, message: Union[bytes, str, Dict[str, Any]], binary: bool = False
+ self, message: Union[bytes, str, dict[str, Any]], binary: bool = False
) -> "Future[None]":
"""Sends the given message to the client of this Web Socket.
message = tornado.escape.json_encode(message)
return self.ws_connection.write_message(message, binary=binary)
- def select_subprotocol(self, subprotocols: List[str]) -> Optional[str]:
+ def select_subprotocol(self, subprotocols: list[str]) -> Optional[str]:
"""Override to implement subprotocol negotiation.
``subprotocols`` is a list of strings identifying the
assert self.ws_connection is not None
return self.ws_connection.selected_subprotocol
- def get_compression_options(self) -> Optional[Dict[str, Any]]:
+ def get_compression_options(self) -> Optional[dict[str, Any]]:
"""Override to return compression options for the connection.
If this method returns None (the default), compression will
@abc.abstractmethod
def write_message(
- self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False
+ self, message: Union[str, bytes, dict[str, Any]], binary: bool = False
) -> "Future[None]":
raise NotImplementedError()
self,
persistent: bool,
max_wbits: Optional[int],
- compression_options: Optional[Dict[str, Any]] = None,
+ compression_options: Optional[dict[str, Any]] = None,
) -> None:
if max_wbits is None:
max_wbits = zlib.MAX_WBITS
persistent: bool,
max_wbits: Optional[int],
max_message_size: int,
- compression_options: Optional[Dict[str, Any]] = None,
+ compression_options: Optional[dict[str, Any]] = None,
) -> None:
self._max_message_size = max_message_size
if max_wbits is None:
def _parse_extensions_header(
self, headers: httputil.HTTPHeaders
- ) -> List[Tuple[str, Dict[str, str]]]:
+ ) -> list[tuple[str, dict[str, str]]]:
extensions = headers.get("Sec-WebSocket-Extensions", "")
if extensions:
return [httputil._parse_header(e.strip()) for e in extensions.split(",")]
def _get_compressor_options(
self,
side: str,
- agreed_parameters: Dict[str, Any],
- compression_options: Optional[Dict[str, Any]] = None,
- ) -> Dict[str, Any]:
+ agreed_parameters: dict[str, Any],
+ compression_options: Optional[dict[str, Any]] = None,
+ ) -> dict[str, Any]:
"""Converts a websocket agreed_parameters set to keyword arguments
for our compressor objects.
"""
- options: Dict[str, Any] = dict(
+ options: dict[str, Any] = dict(
persistent=(side + "_no_context_takeover") not in agreed_parameters
)
wbits_header = agreed_parameters.get(side + "_max_window_bits", None)
def _create_compressors(
self,
side: str,
- agreed_parameters: Dict[str, Any],
- compression_options: Optional[Dict[str, Any]] = None,
+ agreed_parameters: dict[str, Any],
+ compression_options: Optional[dict[str, Any]] = None,
) -> None:
# TODO: handle invalid parameters gracefully
allowed_keys = {
return self.stream.write(frame)
def write_message(
- self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False
+ self, message: Union[str, bytes, dict[str, Any]], binary: bool = False
) -> "Future[None]":
"""Sends the given message to the client of this Web Socket."""
if binary:
self,
request: httpclient.HTTPRequest,
on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]] = None,
- compression_options: Optional[Dict[str, Any]] = None,
+ compression_options: Optional[dict[str, Any]] = None,
ping_interval: Optional[float] = None,
ping_timeout: Optional[float] = None,
max_message_size: int = _default_max_message_size,
- subprotocols: Optional[List[str]] = None,
+ subprotocols: Optional[list[str]] = None,
resolver: Optional[Resolver] = None,
) -> None:
self.connect_future: Future[WebSocketClientConnection] = Future()
future_set_result_unless_cancelled(self.connect_future, self)
def write_message(
- self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False
+ self, message: Union[str, bytes, dict[str, Any]], binary: bool = False
) -> "Future[None]":
"""Sends a message to the WebSocket server.
callback: Optional[Callable[["Future[WebSocketClientConnection]"], None]] = None,
connect_timeout: Optional[float] = None,
on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]] = None,
- compression_options: Optional[Dict[str, Any]] = None,
+ compression_options: Optional[dict[str, Any]] = None,
ping_interval: Optional[float] = None,
ping_timeout: Optional[float] = None,
max_message_size: int = _default_max_message_size,
- subprotocols: Optional[List[str]] = None,
+ subprotocols: Optional[list[str]] = None,
resolver: Optional[Resolver] = None,
) -> "Awaitable[WebSocketClientConnection]":
"""Client-side websocket support.
from tornado.ioloop import IOLoop
from tornado.log import access_log
-from typing import List, Tuple, Optional, Callable, Any, Dict
+from typing import Optional, Callable, Any
from types import TracebackType
import typing
IOLoop.current().spawn_callback(self.handle_request, request)
async def handle_request(self, request: httputil.HTTPServerRequest) -> None:
- data: Dict[str, Any] = {}
- response: List[bytes] = []
+ data: dict[str, Any] = {}
+ response: list[bytes] = []
def start_response(
status: str,
- headers: List[Tuple[str, str]],
+ headers: list[tuple[str, str]],
exc_info: Optional[
- Tuple[
+ tuple[
"Optional[Type[BaseException]]",
Optional[BaseException],
Optional[TracebackType],
status_code_str, reason = data["status"].split(" ", 1)
status_code = int(status_code_str)
- headers: List[Tuple[str, str]] = data["headers"]
+ headers: list[tuple[str, str]] = data["headers"]
header_set = {k.lower() for (k, v) in headers}
body = escape.utf8(body)
if status_code != 304:
request.connection.finish()
self._log(status_code, request)
- def environ(self, request: httputil.HTTPServerRequest) -> Dict[str, Any]:
+ def environ(self, request: httputil.HTTPServerRequest) -> dict[str, Any]:
"""Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment.
.. versionchanged:: 6.3