From: Ben Darnell Date: Thu, 19 Mar 2026 14:22:16 +0000 (-0400) Subject: *: Update type annotations to use Python 3.9 features X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=55ea7ca4a7b33880ffa3cc755fc7d9502d097456;p=thirdparty%2Ftornado.git *: Update type annotations to use Python 3.9 features Automated change with pyupgrade (restricted to the typing plugins) followed by manual removal of unused imports. --- diff --git a/tornado/auth.py b/tornado/auth.py index 89af3fa1..907c260c 100644 --- a/tornado/auth.py +++ b/tornado/auth.py @@ -84,7 +84,7 @@ from tornado.httputil import url_concat from tornado.util import unicode_type from tornado.web import RequestHandler -from typing import List, Any, Dict, cast, Iterable, Union, Optional +from typing import Any, cast, Iterable, Union, Optional class AuthError(Exception): @@ -102,7 +102,7 @@ class OpenIdMixin: def authenticate_redirect( self, callback_uri: Optional[str] = None, - ax_attrs: List[str] = ["name", "email", "language", "username"], + ax_attrs: list[str] = ["name", "email", "language", "username"], ) -> None: """Redirects to the authentication URL for this service. @@ -129,7 +129,7 @@ class OpenIdMixin: async def get_authenticated_user( self, http_client: Optional[httpclient.AsyncHTTPClient] = None - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """Fetches the authenticated user data upon redirect. This method should be called by the handler that receives the @@ -147,7 +147,7 @@ class OpenIdMixin: """ handler = cast(RequestHandler, self) # Verify the OpenID response via direct request to the OP - args: Dict[str, Union[str, bytes]] = { + args: dict[str, Union[str, bytes]] = { k: v[-1] for k, v in handler.request.arguments.items() } args["openid.mode"] = "check_authentication" @@ -164,7 +164,7 @@ class OpenIdMixin: callback_uri: str, ax_attrs: Iterable[str] = [], oauth_scope: Optional[str] = None, - ) -> Dict[str, str]: + ) -> dict[str, str]: handler = cast(RequestHandler, self) url = urllib.parse.urljoin(handler.request.full_url(), callback_uri) args = { @@ -183,7 +183,7 @@ class OpenIdMixin: } ) ax_attrs = set(ax_attrs) - required: List[str] = [] + required: list[str] = [] if "name" in ax_attrs: ax_attrs -= {"name", "firstname", "fullname", "lastname"} required += ["firstname", "fullname", "lastname"] @@ -215,7 +215,7 @@ class OpenIdMixin: def _on_authentication_verified( self, response: httpclient.HTTPResponse - ) -> Dict[str, Any]: + ) -> dict[str, Any]: handler = cast(RequestHandler, self) if b"is_valid:true" not in response.body: raise AuthError("Invalid OpenID response: %r" % response.body) @@ -304,7 +304,7 @@ class OAuthMixin: async def authorize_redirect( self, callback_uri: Optional[str] = None, - extra_params: Optional[Dict[str, Any]] = None, + extra_params: Optional[dict[str, Any]] = None, http_client: Optional[httpclient.AsyncHTTPClient] = None, ) -> None: """Redirects the user to obtain OAuth authorization for this service. @@ -352,7 +352,7 @@ class OAuthMixin: async def get_authenticated_user( self, http_client: Optional[httpclient.AsyncHTTPClient] = None - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """Gets the OAuth authorized user and access token. This method should be called from the handler for your @@ -380,7 +380,7 @@ class OAuthMixin: ) if cookie_key != request_key: raise AuthError("Request token does not match cookie") - token: Dict[str, Union[str, bytes]] = dict(key=cookie_key, secret=cookie_secret) + token: dict[str, Union[str, bytes]] = dict(key=cookie_key, secret=cookie_secret) if oauth_verifier: token["verifier"] = oauth_verifier if http_client is None: @@ -397,7 +397,7 @@ class OAuthMixin: def _oauth_request_token_url( self, callback_uri: Optional[str] = None, - extra_params: Optional[Dict[str, Any]] = None, + extra_params: Optional[dict[str, Any]] = None, ) -> str: handler = cast(RequestHandler, self) consumer_token = self._oauth_consumer_token() @@ -449,7 +449,7 @@ class OAuthMixin: ) handler.redirect(authorize_url + "?" + urllib.parse.urlencode(args)) - def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str: + def _oauth_access_token_url(self, request_token: dict[str, Any]) -> str: consumer_token = self._oauth_consumer_token() url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore args = dict( @@ -475,7 +475,7 @@ class OAuthMixin: args["oauth_signature"] = signature return url + "?" + urllib.parse.urlencode(args) - def _oauth_consumer_token(self) -> Dict[str, Any]: + def _oauth_consumer_token(self) -> dict[str, Any]: """Subclasses must override this to return their OAuth consumer keys. The return value should be a `dict` with keys ``key`` and ``secret``. @@ -483,8 +483,8 @@ class OAuthMixin: raise NotImplementedError() async def _oauth_get_user_future( - self, access_token: Dict[str, Any] - ) -> Dict[str, Any]: + self, access_token: dict[str, Any] + ) -> dict[str, Any]: """Subclasses must override this to get basic information about the user. @@ -509,10 +509,10 @@ class OAuthMixin: def _oauth_request_parameters( self, url: str, - access_token: Dict[str, Any], - parameters: Dict[str, Any] = {}, + access_token: dict[str, Any], + parameters: dict[str, Any] = {}, method: str = "GET", - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """Returns the OAuth parameters as a dict for the given request. parameters should include all POST arguments and query string arguments @@ -567,8 +567,8 @@ class OAuth2Mixin: redirect_uri: Optional[str] = None, client_id: Optional[str] = None, client_secret: Optional[str] = None, - extra_params: Optional[Dict[str, Any]] = None, - scope: Optional[List[str]] = None, + extra_params: Optional[dict[str, Any]] = None, + scope: Optional[list[str]] = None, response_type: str = "code", ) -> None: """Redirects the user to obtain OAuth authorization for this service. @@ -609,10 +609,10 @@ class OAuth2Mixin: client_id: Optional[str] = None, client_secret: Optional[str] = None, code: Optional[str] = None, - extra_params: Optional[Dict[str, Any]] = None, + extra_params: Optional[dict[str, Any]] = None, ) -> str: url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore - args: Dict[str, str] = {} + args: dict[str, str] = {} if redirect_uri is not None: args["redirect_uri"] = redirect_uri if code is not None: @@ -629,7 +629,7 @@ class OAuth2Mixin: self, url: str, access_token: Optional[str] = None, - post_args: Optional[Dict[str, Any]] = None, + post_args: Optional[dict[str, Any]] = None, **args: Any, ) -> Any: """Fetches the given URL auth an OAuth2 access token. @@ -757,8 +757,8 @@ class TwitterMixin(OAuthMixin): async def twitter_request( self, path: str, - access_token: Dict[str, Any], - post_args: Optional[Dict[str, Any]] = None, + access_token: dict[str, Any], + post_args: Optional[dict[str, Any]] = None, **args: Any, ) -> Any: """Fetches the given API path, e.g., ``statuses/user_timeline/btaylor`` @@ -826,7 +826,7 @@ class TwitterMixin(OAuthMixin): response = await http.fetch(url) return escape.json_decode(response.body) - def _oauth_consumer_token(self) -> Dict[str, Any]: + def _oauth_consumer_token(self) -> dict[str, Any]: handler = cast(RequestHandler, self) handler.require_setting("twitter_consumer_key", "Twitter OAuth") handler.require_setting("twitter_consumer_secret", "Twitter OAuth") @@ -836,8 +836,8 @@ class TwitterMixin(OAuthMixin): ) async def _oauth_get_user_future( - self, access_token: Dict[str, Any] - ) -> Dict[str, Any]: + self, access_token: dict[str, Any] + ) -> dict[str, Any]: user = await self.twitter_request( "/account/verify_credentials", access_token=access_token ) @@ -876,7 +876,7 @@ class GoogleOAuth2Mixin(OAuth2Mixin): _OAUTH_NO_CALLBACKS = False _OAUTH_SETTINGS_KEY = "google_oauth" - def get_google_oauth_settings(self) -> Dict[str, str]: + def get_google_oauth_settings(self) -> dict[str, str]: """Return the Google OAuth 2.0 credentials that you created with [Google Cloud Platform](https://console.cloud.google.com/apis/credentials). The dict @@ -898,7 +898,7 @@ class GoogleOAuth2Mixin(OAuth2Mixin): code: str, client_id: Optional[str] = None, client_secret: Optional[str] = None, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """Handles the login for the Google user, returning an access token. The result is a dictionary containing an ``access_token`` field @@ -990,8 +990,8 @@ class FacebookGraphMixin(OAuth2Mixin): client_id: str, client_secret: str, code: str, - extra_fields: Optional[Dict[str, Any]] = None, - ) -> Optional[Dict[str, Any]]: + extra_fields: Optional[dict[str, Any]] = None, + ) -> Optional[dict[str, Any]]: """Handles the login for the Facebook user, returning a user object. Example usage: @@ -1093,7 +1093,7 @@ class FacebookGraphMixin(OAuth2Mixin): self, path: str, access_token: Optional[str] = None, - post_args: Optional[Dict[str, Any]] = None, + post_args: Optional[dict[str, Any]] = None, **args: Any, ) -> Any: """Fetches the given relative API path, e.g., "/btaylor/picture" @@ -1150,11 +1150,11 @@ class FacebookGraphMixin(OAuth2Mixin): def _oauth_signature( - consumer_token: Dict[str, Any], + consumer_token: dict[str, Any], method: str, url: str, - parameters: Dict[str, Any] = {}, - token: Optional[Dict[str, Any]] = None, + parameters: dict[str, Any] = {}, + token: Optional[dict[str, Any]] = None, ) -> bytes: """Calculates the HMAC-SHA1 OAuth signature for the given request. @@ -1181,11 +1181,11 @@ def _oauth_signature( def _oauth10a_signature( - consumer_token: Dict[str, Any], + consumer_token: dict[str, Any], method: str, url: str, - parameters: Dict[str, Any] = {}, - token: Optional[Dict[str, Any]] = None, + parameters: dict[str, Any] = {}, + token: Optional[dict[str, Any]] = None, ) -> bytes: """Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request. @@ -1219,7 +1219,7 @@ def _oauth_escape(val: Union[str, bytes]) -> str: return urllib.parse.quote(val, safe="~") -def _oauth_parse_response(body: bytes) -> Dict[str, Any]: +def _oauth_parse_response(body: bytes) -> dict[str, Any]: # I can't find an officially-defined encoding for oauth responses and # have never seen anyone use non-ascii. Leave the response in a byte # string for python 2, and use utf8 on python 3. diff --git a/tornado/autoreload.py b/tornado/autoreload.py index c6a6e82d..dd05322e 100644 --- a/tornado/autoreload.py +++ b/tornado/autoreload.py @@ -93,7 +93,7 @@ try: except ImportError: signal = None # type: ignore -from typing import Callable, Dict, Optional, List, Union +from typing import Callable, Optional, Union # os.execv is broken on Windows and can't properly parse command line # arguments and executable name if they contain whitespaces. subprocess @@ -107,7 +107,7 @@ _io_loops: "weakref.WeakKeyDictionary[ioloop.IOLoop, bool]" = ( weakref.WeakKeyDictionary() ) _autoreload_is_main = False -_original_argv: Optional[List[str]] = None +_original_argv: Optional[list[str]] = None _original_spec = None @@ -123,7 +123,7 @@ def start(check_time: int = 500) -> None: _io_loops[io_loop] = True if len(_io_loops) > 1: gen_log.warning("tornado.autoreload started more than once in the same process") - modify_times: Dict[str, float] = {} + modify_times: dict[str, float] = {} callback = functools.partial(_reload_on_update, modify_times) scheduler = ioloop.PeriodicCallback(callback, check_time) scheduler.start() @@ -159,7 +159,7 @@ def add_reload_hook(fn: Callable[[], None]) -> None: _reload_hooks.append(fn) -def _reload_on_update(modify_times: Dict[str, float]) -> None: +def _reload_on_update(modify_times: dict[str, float]) -> None: if _reload_attempted: # We already tried to reload and it didn't work, so don't try again. return @@ -185,7 +185,7 @@ def _reload_on_update(modify_times: Dict[str, float]) -> None: _check_file(modify_times, path) -def _check_file(modify_times: Dict[str, float], path: str) -> None: +def _check_file(modify_times: dict[str, float], path: str) -> None: try: modified = os.stat(path).st_mtime except Exception: diff --git a/tornado/concurrent.py b/tornado/concurrent.py index a48d9053..cf550539 100644 --- a/tornado/concurrent.py +++ b/tornado/concurrent.py @@ -34,7 +34,7 @@ import types from tornado.log import app_log import typing -from typing import Any, Callable, Optional, Tuple, Union +from typing import Any, Callable, Optional, Union _T = typing.TypeVar("_T") @@ -213,7 +213,7 @@ def future_set_exception_unless_cancelled( def future_set_exc_info( future: "Union[futures.Future[_T], Future[_T]]", - exc_info: Tuple[ + exc_info: tuple[ Optional[type], Optional[BaseException], Optional[types.TracebackType] ], ) -> None: diff --git a/tornado/curl_httpclient.py b/tornado/curl_httpclient.py index 95d94cb3..aa898092 100644 --- a/tornado/curl_httpclient.py +++ b/tornado/curl_httpclient.py @@ -39,7 +39,7 @@ from tornado.httpclient import ( ) from tornado.log import app_log -from typing import Dict, Any, Callable, Union, Optional +from typing import Any, Callable, Union, Optional import typing if typing.TYPE_CHECKING: @@ -52,7 +52,7 @@ CR_OR_LF_RE = re.compile(b"\r|\n") class CurlAsyncHTTPClient(AsyncHTTPClient): def initialize( # type: ignore - self, max_clients: int = 10, defaults: Optional[Dict[str, Any]] = None + self, max_clients: int = 10, defaults: Optional[dict[str, Any]] = None ) -> None: super().initialize(defaults=defaults) # Typeshed is incomplete for CurlMulti, so just use Any for now. @@ -62,9 +62,9 @@ class CurlAsyncHTTPClient(AsyncHTTPClient): self._curls = [self._curl_create() for i in range(max_clients)] self._free_list = self._curls[:] self._requests: Deque[ - Tuple[HTTPRequest, Callable[[HTTPResponse], None], float] + tuple[HTTPRequest, Callable[[HTTPResponse], None], float] ] = collections.deque() - self._fds: Dict[int, int] = {} + self._fds: dict[int, int] = {} self._timeout: Optional[object] = None # libcurl has bugs that sometimes cause it to not report all diff --git a/tornado/escape.py b/tornado/escape.py index 8515bf58..c606597e 100644 --- a/tornado/escape.py +++ b/tornado/escape.py @@ -33,7 +33,7 @@ import urllib.parse from tornado.util import unicode_type import typing -from typing import Union, Any, Optional, Dict, List, Callable +from typing import Union, Any, Optional, Callable def xhtml_escape(value: Union[str, bytes]) -> str: @@ -172,7 +172,7 @@ def url_unescape( def parse_qs_bytes( qs: Union[str, bytes], keep_blank_values: bool = False, strict_parsing: bool = False -) -> Dict[str, List[bytes]]: +) -> dict[str, list[bytes]]: """Parses a query string like urlparse.parse_qs, but takes bytes and returns the values as byte strings. @@ -301,7 +301,7 @@ def linkify( shorten: bool = False, extra_params: Union[str, Callable[[str], str]] = "", require_protocol: bool = False, - permitted_protocols: List[str] = ["http", "https"], + permitted_protocols: list[str] = ["http", "https"], ) -> str: """Converts plain text into HTML with links. diff --git a/tornado/gen.py b/tornado/gen.py index 2a580f83..9b218305 100644 --- a/tornado/gen.py +++ b/tornado/gen.py @@ -111,7 +111,7 @@ if typing.TYPE_CHECKING: _T = typing.TypeVar("_T") _Yieldable = Union[ - None, Awaitable, List[Awaitable], Dict[Any, Awaitable], concurrent.futures.Future + None, Awaitable, list[Awaitable], dict[Any, Awaitable], concurrent.futures.Future ] @@ -363,7 +363,7 @@ class WaitIterator: """ - _unfinished: Dict[Future, Union[int, str]] = {} + _unfinished: dict[Future, Union[int, str]] = {} def __init__(self, *args: Future, **kwargs: Future) -> None: if args and kwargs: @@ -439,15 +439,15 @@ class WaitIterator: @overload def multi( children: Sequence[_Yieldable], - quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]] = (), -) -> Future[List]: ... + quiet_exceptions: Union[type[Exception], tuple[type[Exception], ...]] = (), +) -> Future[list]: ... @overload def multi( children: Mapping[Any, _Yieldable], - quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]] = (), -) -> Future[Dict]: ... + quiet_exceptions: Union[type[Exception], tuple[type[Exception], ...]] = (), +) -> Future[dict]: ... def multi( @@ -523,7 +523,7 @@ def multi_future( Use `multi` instead. """ if isinstance(children, dict): - keys: Optional[List] = list(children.keys()) + keys: Optional[list] = list(children.keys()) children_seq: Iterable = children.values() else: keys = None @@ -559,7 +559,7 @@ def multi_future( else: future_set_result_unless_cancelled(future, result_list) - listening: Set[Future] = set() + listening: set[Future] = set() for f in children_futs: if f not in listening: listening.add(f) @@ -844,7 +844,7 @@ class Runner: return True def handle_exception( - self, typ: Type[Exception], value: Exception, tb: types.TracebackType + self, typ: type[Exception], value: Exception, tb: types.TracebackType ) -> bool: if not self.running and not self.finished: self.future = Future() diff --git a/tornado/http1connection.py b/tornado/http1connection.py index 21916072..cacc0c7e 100644 --- a/tornado/http1connection.py +++ b/tornado/http1connection.py @@ -36,7 +36,7 @@ from tornado.log import gen_log, app_log from tornado.util import GzipDecompressor -from typing import cast, Optional, Type, Awaitable, Callable, Union, Tuple +from typing import cast, Optional, Type, Awaitable, Callable, Union CR_OR_LF_RE = re.compile(b"\r|\n") @@ -578,7 +578,7 @@ class HTTP1Connection(httputil.HTTPConnection): if not self._finish_future.done(): future_set_result_unless_cancelled(self._finish_future, None) - def _parse_headers(self, data: bytes) -> Tuple[str, httputil.HTTPHeaders]: + def _parse_headers(self, data: bytes) -> tuple[str, httputil.HTTPHeaders]: # The lstrip removes newlines that some implementations sometimes # insert between messages of a reused connection. Per RFC 7230, # we SHOULD ignore at least one empty line before the request. diff --git a/tornado/httpclient.py b/tornado/httpclient.py index 1fb878b6..4c9cf98e 100644 --- a/tornado/httpclient.py +++ b/tornado/httpclient.py @@ -53,7 +53,7 @@ from tornado import gen, httputil from tornado.ioloop import IOLoop from tornado.util import Configurable -from typing import Type, Any, Union, Dict, Callable, Optional, Awaitable, cast +from typing import Type, Any, Union, Callable, Optional, Awaitable, cast class HTTPClient: @@ -176,20 +176,20 @@ class AsyncHTTPClient(Configurable): """ - _instance_cache: Dict[IOLoop, "AsyncHTTPClient"] + _instance_cache: dict[IOLoop, "AsyncHTTPClient"] @classmethod - def configurable_base(cls) -> Type[Configurable]: + def configurable_base(cls) -> type[Configurable]: return AsyncHTTPClient @classmethod - def configurable_default(cls) -> Type[Configurable]: + def configurable_default(cls) -> type[Configurable]: from tornado.simple_httpclient import SimpleAsyncHTTPClient return SimpleAsyncHTTPClient @classmethod - def _async_clients(cls) -> Dict[IOLoop, "AsyncHTTPClient"]: + def _async_clients(cls) -> dict[IOLoop, "AsyncHTTPClient"]: attr_name = "_async_client_dict_" + cls.__name__ if not hasattr(cls, attr_name): setattr(cls, attr_name, weakref.WeakKeyDictionary()) @@ -213,7 +213,7 @@ class AsyncHTTPClient(Configurable): instance_cache[instance.io_loop] = instance return instance - def initialize(self, defaults: Optional[Dict[str, Any]] = None) -> None: + def initialize(self, defaults: Optional[dict[str, Any]] = None) -> None: self.io_loop = IOLoop.current() self.defaults = dict(HTTPRequest._DEFAULTS) if defaults is not None: @@ -339,7 +339,7 @@ class AsyncHTTPClient(Configurable): class HTTPRequest: """HTTP client request object.""" - _headers: Union[Dict[str, str], httputil.HTTPHeaders] + _headers: Union[dict[str, str], httputil.HTTPHeaders] # Default values for HTTPRequest parameters. # Merged with the values on the request object by AsyncHTTPClient @@ -359,7 +359,7 @@ class HTTPRequest: self, url: str, method: str = "GET", - headers: Optional[Union[Dict[str, str], httputil.HTTPHeaders]] = None, + headers: Optional[Union[dict[str, str], httputil.HTTPHeaders]] = None, body: Optional[Union[bytes, str]] = None, auth_username: Optional[str] = None, auth_password: Optional[str] = None, @@ -393,7 +393,7 @@ class HTTPRequest: ] = None, expect_100_continue: bool = False, decompress_response: Optional[bool] = None, - ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None, + ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None, ) -> None: r"""All parameters except ``url`` are optional. @@ -558,7 +558,7 @@ class HTTPRequest: return self._headers # type: ignore @headers.setter - def headers(self, value: Union[Dict[str, str], httputil.HTTPHeaders]) -> None: + def headers(self, value: Union[dict[str, str], httputil.HTTPHeaders]) -> None: if value is None: self._headers = httputil.HTTPHeaders() else: @@ -637,7 +637,7 @@ class HTTPResponse: effective_url: Optional[str] = None, error: Optional[BaseException] = None, request_time: Optional[float] = None, - time_info: Optional[Dict[str, float]] = None, + time_info: Optional[dict[str, float]] = None, reason: Optional[str] = None, start_time: Optional[float] = None, ) -> None: @@ -741,7 +741,7 @@ class _RequestProxy: """ def __init__( - self, request: HTTPRequest, defaults: Optional[Dict[str, Any]] + self, request: HTTPRequest, defaults: Optional[dict[str, Any]] ) -> None: self.request = request self.defaults = defaults diff --git a/tornado/httpserver.py b/tornado/httpserver.py index 4b8daea6..a952fac8 100644 --- a/tornado/httpserver.py +++ b/tornado/httpserver.py @@ -37,7 +37,7 @@ from tornado.tcpserver import TCPServer from tornado.util import Configurable import typing -from typing import Union, Any, Dict, Callable, List, Type, Tuple, Optional, Awaitable +from typing import Union, Any, Callable, Optional, Awaitable if typing.TYPE_CHECKING: from typing import Set # noqa: F401 @@ -169,7 +169,7 @@ class HTTPServer(TCPServer, Configurable, httputil.HTTPServerConnectionDelegate) ], no_keep_alive: bool = False, xheaders: bool = False, - ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None, + ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None, protocol: Optional[str] = None, decompress_request: bool = False, chunk_size: Optional[int] = None, @@ -178,7 +178,7 @@ class HTTPServer(TCPServer, Configurable, httputil.HTTPServerConnectionDelegate) body_timeout: Optional[float] = None, max_body_size: Optional[int] = None, max_buffer_size: Optional[int] = None, - trusted_downstream: Optional[List[str]] = None, + trusted_downstream: Optional[list[str]] = None, ) -> None: # This method's signature is not extracted with autodoc # because we want its arguments to appear on the class @@ -202,15 +202,15 @@ class HTTPServer(TCPServer, Configurable, httputil.HTTPServerConnectionDelegate) max_buffer_size=max_buffer_size, read_chunk_size=chunk_size, ) - self._connections: Set[HTTP1ServerConnection] = set() + self._connections: set[HTTP1ServerConnection] = set() self.trusted_downstream = trusted_downstream @classmethod - def configurable_base(cls) -> Type[Configurable]: + def configurable_base(cls) -> type[Configurable]: return HTTPServer @classmethod - def configurable_default(cls) -> Type[Configurable]: + def configurable_default(cls) -> type[Configurable]: return HTTPServer async def close_all_connections(self) -> None: @@ -232,7 +232,7 @@ class HTTPServer(TCPServer, Configurable, httputil.HTTPServerConnectionDelegate) conn = next(iter(self._connections)) await conn.close() - def handle_stream(self, stream: iostream.IOStream, address: Tuple) -> None: + def handle_stream(self, stream: iostream.IOStream, address: tuple) -> None: context = _HTTPRequestContext( stream, address, self.protocol, self.trusted_downstream ) @@ -267,7 +267,7 @@ class _CallableAdapter(httputil.HTTPMessageDelegate): self.request_callback = request_callback self.request: Optional[httputil.HTTPServerRequest] = None self.delegate = None - self._chunks: List[bytes] = [] + self._chunks: list[bytes] = [] def headers_received( self, @@ -299,9 +299,9 @@ class _HTTPRequestContext: def __init__( self, stream: iostream.IOStream, - address: Tuple, + address: tuple, protocol: Optional[str], - trusted_downstream: Optional[List[str]] = None, + trusted_downstream: Optional[list[str]] = None, ) -> None: self.address = address # Save the socket's address family now so we know how to diff --git a/tornado/iostream.py b/tornado/iostream.py index d726bee6..78161b1c 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -48,9 +48,7 @@ from typing import ( Callable, Pattern, Any, - Dict, TypeVar, - Tuple, ) from types import TracebackType @@ -122,7 +120,7 @@ class _StreamBuffer: def __init__(self) -> None: # A sequence of (False, bytearray) and (True, memoryview) objects - self._buffers: Deque[Tuple[bool, Union[bytearray, memoryview]]] = ( + self._buffers: Deque[tuple[bool, Union[bytearray, memoryview]]] = ( collections.deque() ) # Position in the first buffer @@ -265,7 +263,7 @@ class BaseIOStream: self._read_partial = False self._read_until_close = False self._read_future: Optional[Future] = None - self._write_futures: Deque[Tuple[int, Future[None]]] = collections.deque() + self._write_futures: Deque[tuple[int, Future[None]]] = collections.deque() self._close_callback: Optional[Callable[[], None]] = None self._connect_future: Optional[Future[IOStream]] = None # _ssl_connect_future should be defined in SSLIOStream @@ -561,7 +559,7 @@ class BaseIOStream: None, bool, BaseException, - Tuple[ + tuple[ "Optional[Type[BaseException]]", Optional[BaseException], Optional[TracebackType], @@ -604,7 +602,7 @@ class BaseIOStream: self._signal_closed() def _signal_closed(self) -> None: - futures: List[Future] = [] + futures: list[Future] = [] if self._read_future is not None: futures.append(self._read_future) self._read_future = None @@ -1192,7 +1190,7 @@ class IOStream(BaseIOStream): def start_tls( self, server_side: bool, - ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None, + ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None, server_hostname: Optional[str] = None, ) -> Awaitable["SSLIOStream"]: """Convert this `IOStream` to an `SSLIOStream`. @@ -1423,7 +1421,7 @@ class SSLIOStream(IOStream): super()._handle_write() def connect( - self, address: Tuple, server_hostname: Optional[str] = None + self, address: tuple, server_hostname: Optional[str] = None ) -> "Future[SSLIOStream]": self._server_hostname = server_hostname # Ignore the result of connect(). If it fails, diff --git a/tornado/locks.py b/tornado/locks.py index c997cbd2..2bb18eca 100644 --- a/tornado/locks.py +++ b/tornado/locks.py @@ -197,7 +197,7 @@ class Event: def __init__(self) -> None: self._value = False - self._waiters: Set[Future[None]] = set() + self._waiters: set[Future[None]] = set() def __repr__(self) -> str: return "<{} {}>".format( diff --git a/tornado/log.py b/tornado/log.py index fd54144b..dc9fcd2e 100644 --- a/tornado/log.py +++ b/tornado/log.py @@ -45,7 +45,7 @@ try: except ImportError: curses = None # type: ignore -from typing import Dict, Any, cast, Optional +from typing import Any, cast, Optional # Logger objects for internal tornado use access_log = logging.getLogger("tornado.access") @@ -120,7 +120,7 @@ class LogFormatter(logging.Formatter): datefmt: str = DEFAULT_DATE_FORMAT, style: str = "%", color: bool = True, - colors: Dict[int, int] = DEFAULT_COLORS, + colors: dict[int, int] = DEFAULT_COLORS, ) -> None: r""" :arg bool color: Enables color support. @@ -140,7 +140,7 @@ class LogFormatter(logging.Formatter): logging.Formatter.__init__(self, datefmt=datefmt) self._fmt = fmt - self._colors: Dict[int, str] = {} + self._colors: dict[int, str] = {} if color and _stderr_supports_color(): if curses is not None: fg_color = curses.tigetstr("setaf") or curses.tigetstr("setf") or b"" diff --git a/tornado/netutil.py b/tornado/netutil.py index 1741f1b1..c9a8a4cb 100644 --- a/tornado/netutil.py +++ b/tornado/netutil.py @@ -28,7 +28,7 @@ from tornado.concurrent import dummy_executor, run_on_executor from tornado.ioloop import IOLoop from tornado.util import Configurable, errno_from_exception -from typing import List, Callable, Any, Type, Dict, Union, Tuple, Awaitable, Optional +from typing import Callable, Any, Union, Awaitable, Optional # Note that the naming of ssl.Purpose is confusing; the purpose # of a context is to authenticate the opposite side of the connection. @@ -60,7 +60,7 @@ def bind_sockets( backlog: int = _DEFAULT_BACKLOG, flags: Optional[int] = None, reuse_port: bool = False, -) -> List[socket.socket]: +) -> list[socket.socket]: """Creates listening sockets bound to the given port and address. Returns a list of socket objects (multiple sockets are returned if @@ -346,16 +346,16 @@ class Resolver(Configurable): """ @classmethod - def configurable_base(cls) -> Type["Resolver"]: + def configurable_base(cls) -> type["Resolver"]: return Resolver @classmethod - def configurable_default(cls) -> Type["Resolver"]: + def configurable_default(cls) -> type["Resolver"]: return DefaultLoopResolver def resolve( self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC - ) -> Awaitable[List[Tuple[int, Any]]]: + ) -> Awaitable[list[tuple[int, Any]]]: """Resolves an address. The ``host`` argument is a string which may be a hostname or a @@ -390,7 +390,7 @@ class Resolver(Configurable): def _resolve_addr( host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC -) -> List[Tuple[int, Any]]: +) -> list[tuple[int, Any]]: # On Solaris, getaddrinfo fails if the given port is not found # in /etc/services and no socket type is given, so we must pass # one here. The socket type used here doesn't seem to actually @@ -415,7 +415,7 @@ class DefaultExecutorResolver(Resolver): async def resolve( self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC - ) -> List[Tuple[int, Any]]: + ) -> list[tuple[int, Any]]: result = await IOLoop.current().run_in_executor( None, _resolve_addr, host, port, family ) @@ -427,7 +427,7 @@ class DefaultLoopResolver(Resolver): async def resolve( self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC - ) -> List[Tuple[int, Any]]: + ) -> list[tuple[int, Any]]: # On Solaris, getaddrinfo fails if the given port is not found # in /etc/services and no socket type is given, so we must pass # one here. The socket type used here doesn't seem to actually @@ -479,7 +479,7 @@ class ExecutorResolver(Resolver): @run_on_executor def resolve( self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC - ) -> List[Tuple[int, Any]]: + ) -> list[tuple[int, Any]]: return _resolve_addr(host, port, family) @@ -569,7 +569,7 @@ class OverrideResolver(Resolver): def resolve( self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC - ) -> Awaitable[List[Tuple[int, Any]]]: + ) -> Awaitable[list[tuple[int, Any]]]: if (host, port, family) in self.mapping: host, port = self.mapping[(host, port, family)] elif (host, port) in self.mapping: @@ -588,7 +588,7 @@ _SSL_CONTEXT_KEYWORDS = frozenset( def ssl_options_to_context( - ssl_options: Union[Dict[str, Any], ssl.SSLContext], + ssl_options: Union[dict[str, Any], ssl.SSLContext], server_side: Optional[bool] = None, ) -> ssl.SSLContext: """Try to convert an ``ssl_options`` dictionary to an @@ -642,7 +642,7 @@ def ssl_options_to_context( def ssl_wrap_socket( socket: socket.socket, - ssl_options: Union[Dict[str, Any], ssl.SSLContext], + ssl_options: Union[dict[str, Any], ssl.SSLContext], server_hostname: Optional[str] = None, server_side: Optional[bool] = None, **kwargs: Any, diff --git a/tornado/options.py b/tornado/options.py index 5579c152..fe9f52f7 100644 --- a/tornado/options.py +++ b/tornado/options.py @@ -114,11 +114,7 @@ from typing import ( Any, Iterator, Iterable, - Tuple, - Set, - Dict, Callable, - List, TextIO, Optional, ) @@ -176,21 +172,21 @@ class OptionParser: def __setitem__(self, name: str, value: Any) -> None: return self.__setattr__(name, value) - def items(self) -> Iterable[Tuple[str, Any]]: + def items(self) -> Iterable[tuple[str, Any]]: """An iterable of (name, value) pairs. .. versionadded:: 3.1 """ return [(opt.name, opt.value()) for name, opt in self._options.items()] - def groups(self) -> Set[str]: + def groups(self) -> set[str]: """The set of option-groups created by ``define``. .. versionadded:: 3.1 """ return {opt.group_name for opt in self._options.values()} - def group_dict(self, group: str) -> Dict[str, Any]: + def group_dict(self, group: str) -> dict[str, Any]: """The names and values of options in a group. Useful for copying options into Application settings:: @@ -213,7 +209,7 @@ class OptionParser: if not group or group == opt.group_name } - def as_dict(self) -> Dict[str, Any]: + def as_dict(self) -> dict[str, Any]: """The names and values of all options. .. versionadded:: 3.1 @@ -313,8 +309,8 @@ class OptionParser: self._options[normalized] = option def parse_command_line( - self, args: Optional[List[str]] = None, final: bool = True - ) -> List[str]: + self, args: Optional[list[str]] = None, final: bool = True + ) -> list[str]: """Parses all options given on the command line (defaults to `sys.argv`). @@ -338,7 +334,7 @@ class OptionParser: """ if args is None: args = sys.argv - remaining: List[str] = [] + remaining: list[str] = [] for i in range(1, len(args)): # All things after the last option are command line arguments if not args[i].startswith("-"): @@ -443,7 +439,7 @@ class OptionParser: file = sys.stderr print("Usage: %s [OPTIONS]" % sys.argv[0], file=file) print("\nOptions:\n", file=file) - by_group: Dict[str, List[_Option]] = {} + by_group: dict[str, list[_Option]] = {} for option in self._options.values(): by_group.setdefault(option.group_name, []).append(option) @@ -706,8 +702,8 @@ def define( def parse_command_line( - args: Optional[List[str]] = None, final: bool = True -) -> List[str]: + args: Optional[list[str]] = None, final: bool = True +) -> list[str]: """Parses global options from the command line. See `OptionParser.parse_command_line`. diff --git a/tornado/platform/asyncio.py b/tornado/platform/asyncio.py index a7a2e700..72299bd0 100644 --- a/tornado/platform/asyncio.py +++ b/tornado/platform/asyncio.py @@ -40,12 +40,8 @@ from tornado.ioloop import IOLoop, _Selectable from typing import ( Any, Callable, - Dict, - List, Optional, Protocol, - Set, - Tuple, TypeVar, Union, ) @@ -67,7 +63,7 @@ if typing.TYPE_CHECKING: _Ts = TypeVarTuple("_Ts") # Collection of selector thread event loops to shut down on exit. -_selector_loops: Set["SelectorThread"] = set() +_selector_loops: set["SelectorThread"] = set() def _atexit_callback() -> None: @@ -109,10 +105,10 @@ class BaseAsyncIOLoop(IOLoop): # doesn't understand dynamic proxies. self.selector_loop = AddThreadSelectorEventLoop(asyncio_loop) # type: ignore # Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler) - self.handlers: Dict[int, Tuple[Union[int, _Selectable], Callable]] = {} + self.handlers: dict[int, tuple[Union[int, _Selectable], Callable]] = {} # Set of fds listening for reads/writes - self.readers: Set[int] = set() - self.writers: Set[int] = set() + self.readers: set[int] = set() + self.writers: set[int] = set() self.closing = False # If an asyncio loop was closed through an asyncio interface # instead of IOLoop.close(), we'd never hear about it and may @@ -479,7 +475,7 @@ class SelectorThread: self._select_cond = threading.Condition() self._select_args: Optional[ - Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]] + tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]] ] = None self._closing_selector = False self._thread: Optional[threading.Thread] = None @@ -498,8 +494,8 @@ class SelectorThread: context=self._main_thread_ctx, ) - self._readers: Dict[_FileDescriptorLike, Callable] = {} - self._writers: Dict[_FileDescriptorLike, Callable] = {} + self._readers: dict[_FileDescriptorLike, Callable] = {} + self._writers: dict[_FileDescriptorLike, Callable] = {} # Writing to _waker_w will wake up the selector thread, which # watches for _waker_r to be readable. @@ -639,7 +635,7 @@ class SelectorThread: pass def _handle_select( - self, rs: List[_FileDescriptorLike], ws: List[_FileDescriptorLike] + self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike] ) -> None: for r in rs: self._handle_event(r, self._readers) @@ -650,7 +646,7 @@ class SelectorThread: def _handle_event( self, fd: _FileDescriptorLike, - cb_map: Dict[_FileDescriptorLike, Callable], + cb_map: dict[_FileDescriptorLike, Callable], ) -> None: try: callback = cb_map[fd] diff --git a/tornado/platform/caresresolver.py b/tornado/platform/caresresolver.py index 26e6a391..3945b984 100644 --- a/tornado/platform/caresresolver.py +++ b/tornado/platform/caresresolver.py @@ -36,7 +36,7 @@ class CaresResolver(Resolver): def initialize(self) -> None: self.io_loop = IOLoop.current() self.channel = pycares.Channel(sock_state_cb=self._sock_state_cb) - self.fds: Dict[int, int] = {} + self.fds: dict[int, int] = {} def _sock_state_cb(self, fd: int, readable: bool, writable: bool) -> None: state = (IOLoop.READ if readable else 0) | (IOLoop.WRITE if writable else 0) @@ -67,7 +67,7 @@ class CaresResolver(Resolver): addresses = [host] else: # gethostbyname doesn't take callback as a kwarg - fut: Future[Tuple[Any, Any]] = Future() + fut: Future[tuple[Any, Any]] = Future() self.channel.gethostbyname( host, family, lambda result, error: fut.set_result((result, error)) ) diff --git a/tornado/process.py b/tornado/process.py index ab05d157..1ff2c839 100644 --- a/tornado/process.py +++ b/tornado/process.py @@ -214,8 +214,8 @@ class Subprocess: self.io_loop = ioloop.IOLoop.current() # All FDs we create should be closed on error; those in to_close # should be closed in the parent process on success. - pipe_fds: List[int] = [] - to_close: List[int] = [] + pipe_fds: list[int] = [] + to_close: list[int] = [] if kwargs.get("stdin") is Subprocess.STREAM: in_r, in_w = os.pipe() kwargs["stdin"] = in_r diff --git a/tornado/routing.py b/tornado/routing.py index 815dabac..6c6a7166 100644 --- a/tornado/routing.py +++ b/tornado/routing.py @@ -189,10 +189,7 @@ from typing import ( Union, Optional, Awaitable, - List, - Dict, Pattern, - Tuple, overload, Sequence, ) @@ -300,10 +297,10 @@ class _DefaultMessageDelegate(httputil.HTTPMessageDelegate): _RuleList = Sequence[ Union[ "Rule", - List[Any], # Can't do detailed typechecking of lists. - Tuple[Union[str, "Matcher"], Any], - Tuple[Union[str, "Matcher"], Any, Dict[str, Any]], - Tuple[Union[str, "Matcher"], Any, Dict[str, Any], str], + list[Any], # Can't do detailed typechecking of lists. + tuple[Union[str, "Matcher"], Any], + tuple[Union[str, "Matcher"], Any, dict[str, Any]], + tuple[Union[str, "Matcher"], Any, dict[str, Any], str], ] ] @@ -338,7 +335,7 @@ class RuleRouter(Router): :arg rules: a list of `Rule` instances or tuples of `Rule` constructor arguments. """ - self.rules: List[Rule] = [] + self.rules: list[Rule] = [] if rules: self.add_rules(rules) @@ -421,7 +418,7 @@ class ReversibleRuleRouter(ReversibleRouter, RuleRouter): """ def __init__(self, rules: Optional[_RuleList] = None) -> None: - self.named_rules: Dict[str, Any] = {} + self.named_rules: dict[str, Any] = {} super().__init__(rules) def process_rule(self, rule: "Rule") -> "Rule": @@ -456,7 +453,7 @@ class Rule: self, matcher: "Matcher", target: Any, - target_kwargs: Optional[Dict[str, Any]] = None, + target_kwargs: Optional[dict[str, Any]] = None, name: Optional[str] = None, ) -> None: """Constructs a Rule instance. @@ -501,7 +498,7 @@ class Rule: class Matcher: """Represents a matcher for request features.""" - def match(self, request: httputil.HTTPServerRequest) -> Optional[Dict[str, Any]]: + def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]: """Matches current instance against the request. :arg httputil.HTTPServerRequest request: current HTTP request @@ -521,7 +518,7 @@ class Matcher: class AnyMatches(Matcher): """Matches any request.""" - def match(self, request: httputil.HTTPServerRequest) -> Optional[Dict[str, Any]]: + def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]: return {} @@ -536,7 +533,7 @@ class HostMatches(Matcher): else: self.host_pattern = host_pattern - def match(self, request: httputil.HTTPServerRequest) -> Optional[Dict[str, Any]]: + def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]: if self.host_pattern.match(request.host_name): return {} @@ -552,7 +549,7 @@ class DefaultHostMatches(Matcher): self.application = application self.host_pattern = host_pattern - def match(self, request: httputil.HTTPServerRequest) -> Optional[Dict[str, Any]]: + def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]: # Look for default host if not behind load balancer (for debugging) if "X-Real-Ip" not in request.headers: if self.host_pattern.match(self.application.default_host): @@ -578,15 +575,15 @@ class PathMatches(Matcher): self._path, self._group_count = self._find_groups() - def match(self, request: httputil.HTTPServerRequest) -> Optional[Dict[str, Any]]: + def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]: match = self.regex.match(request.path) if match is None: return None if not self.regex.groups: return {} - path_args: List[bytes] = [] - path_kwargs: Dict[str, bytes] = {} + path_args: list[bytes] = [] + path_kwargs: dict[str, bytes] = {} # Pass matched groups to the handler. Since # match.groups() includes both named and @@ -616,7 +613,7 @@ class PathMatches(Matcher): converted_args.append(url_escape(utf8(a), plus=False)) return self._path % tuple(converted_args) - def _find_groups(self) -> Tuple[Optional[str], Optional[int]]: + def _find_groups(self) -> tuple[Optional[str], Optional[int]]: """Returns a tuple (reverse string, group count) for a url. For example: Given the url pattern /([0-9]{4})/([a-z-]+)/, this method @@ -669,7 +666,7 @@ class URLSpec(Rule): self, pattern: Union[str, Pattern], handler: Any, - kwargs: Optional[Dict[str, Any]] = None, + kwargs: Optional[dict[str, Any]] = None, name: Optional[str] = None, ) -> None: """Parameters: diff --git a/tornado/simple_httpclient.py b/tornado/simple_httpclient.py index 522b85a4..9d50ed9e 100644 --- a/tornado/simple_httpclient.py +++ b/tornado/simple_httpclient.py @@ -33,7 +33,7 @@ import time from io import BytesIO import urllib.parse -from typing import Dict, Any, Callable, Optional, Type, Union, Awaitable +from typing import Any, Callable, Optional, Type, Union, Awaitable from types import TracebackType import typing @@ -118,23 +118,23 @@ class SimpleAsyncHTTPClient(AsyncHTTPClient): def initialize( # type: ignore self, max_clients: int = 10, - hostname_mapping: Optional[Dict[str, str]] = None, + hostname_mapping: Optional[dict[str, str]] = None, max_buffer_size: int = 104857600, resolver: Optional[Resolver] = None, - defaults: Optional[Dict[str, Any]] = None, + defaults: Optional[dict[str, Any]] = None, max_header_size: Optional[int] = None, max_body_size: Optional[int] = None, ) -> None: super().initialize(defaults=defaults) self.max_clients = max_clients self.queue: Deque[ - Tuple[object, HTTPRequest, Callable[[HTTPResponse], None]] + tuple[object, HTTPRequest, Callable[[HTTPResponse], None]] ] = collections.deque() - self.active: Dict[ - object, Tuple[HTTPRequest, Callable[[HTTPResponse], None]] + self.active: dict[ + object, tuple[HTTPRequest, Callable[[HTTPResponse], None]] ] = {} - self.waiting: Dict[ - object, Tuple[HTTPRequest, Callable[[HTTPResponse], None], object] + self.waiting: dict[ + object, tuple[HTTPRequest, Callable[[HTTPResponse], None], object] ] = {} self.max_buffer_size = max_buffer_size self.max_header_size = max_header_size @@ -276,7 +276,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): self.max_body_size = max_body_size self.code: Optional[int] = None self.headers: Optional[httputil.HTTPHeaders] = None - self.chunks: List[bytes] = [] + self.chunks: list[bytes] = [] self._decompressor = None # Timeout handle returned by IOLoop.add_timeout self._timeout: object = None @@ -448,7 +448,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): def _get_ssl_options( self, scheme: str - ) -> Union[None, Dict[str, Any], ssl.SSLContext]: + ) -> Union[None, dict[str, Any], ssl.SSLContext]: if scheme == "https": if self.request.ssl_options is not None: return self.request.ssl_options diff --git a/tornado/tcpclient.py b/tornado/tcpclient.py index 208c8037..31defeb1 100644 --- a/tornado/tcpclient.py +++ b/tornado/tcpclient.py @@ -29,7 +29,7 @@ from tornado import gen from tornado.netutil import Resolver from tornado.gen import TimeoutError -from typing import Any, Union, Dict, Tuple, List, Callable, Iterator, Optional +from typing import Any, Union, Tuple, Callable, Iterator, Optional if typing.TYPE_CHECKING: from typing import Set # noqa(F401) @@ -57,28 +57,28 @@ class _Connector: def __init__( self, - addrinfo: List[Tuple], + addrinfo: list[tuple], connect: Callable[ - [socket.AddressFamily, Tuple], Tuple[IOStream, "Future[IOStream]"] + [socket.AddressFamily, tuple], tuple[IOStream, "Future[IOStream]"] ], ) -> None: self.io_loop = IOLoop.current() self.connect = connect - self.future: Future[Tuple[socket.AddressFamily, Any, IOStream]] = Future() + self.future: Future[tuple[socket.AddressFamily, Any, IOStream]] = Future() self.timeout: Optional[object] = None self.connect_timeout: Optional[object] = None self.last_error: Optional[Exception] = None self.remaining = len(addrinfo) self.primary_addrs, self.secondary_addrs = self.split(addrinfo) - self.streams: Set[IOStream] = set() + self.streams: set[IOStream] = set() @staticmethod def split( - addrinfo: List[Tuple], - ) -> Tuple[ - List[Tuple[socket.AddressFamily, Tuple]], - List[Tuple[socket.AddressFamily, Tuple]], + addrinfo: list[tuple], + ) -> tuple[ + list[tuple[socket.AddressFamily, tuple]], + list[tuple[socket.AddressFamily, tuple]], ]: """Partition the ``addrinfo`` list by address family. @@ -109,7 +109,7 @@ class _Connector: self.set_connect_timeout(connect_timeout) return self.future - def try_connect(self, addrs: Iterator[Tuple[socket.AddressFamily, Tuple]]) -> None: + def try_connect(self, addrs: Iterator[tuple[socket.AddressFamily, tuple]]) -> None: try: af, addr = next(addrs) except StopIteration: @@ -129,9 +129,9 @@ class _Connector: def on_connect_done( self, - addrs: Iterator[Tuple[socket.AddressFamily, Tuple]], + addrs: Iterator[tuple[socket.AddressFamily, tuple]], af: socket.AddressFamily, - addr: Tuple, + addr: tuple, future: "Future[IOStream]", ) -> None: self.remaining -= 1 @@ -220,7 +220,7 @@ class TCPClient: host: str, port: int, af: socket.AddressFamily = socket.AF_UNSPEC, - ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None, + ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None, max_buffer_size: Optional[int] = None, source_ip: Optional[str] = None, source_port: Optional[int] = None, @@ -295,10 +295,10 @@ class TCPClient: self, max_buffer_size: Optional[int], af: socket.AddressFamily, - addr: Tuple, + addr: tuple, source_ip: Optional[str] = None, source_port: Optional[int] = None, - ) -> Tuple[IOStream, "Future[IOStream]"]: + ) -> tuple[IOStream, "Future[IOStream]"]: # Always connect in plaintext; we'll convert to ssl if necessary # after one connection has completed. source_port_bind = source_port if isinstance(source_port, int) else 0 diff --git a/tornado/tcpserver.py b/tornado/tcpserver.py index 6fcdc471..4d760b45 100644 --- a/tornado/tcpserver.py +++ b/tornado/tcpserver.py @@ -34,7 +34,7 @@ from tornado import process from tornado.util import errno_from_exception import typing -from typing import Union, Dict, Any, Iterable, Optional, Awaitable +from typing import Union, Any, Iterable, Optional, Awaitable if typing.TYPE_CHECKING: from typing import Callable, List # noqa: F401 @@ -123,14 +123,14 @@ class TCPServer: def __init__( self, - ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None, + ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None, max_buffer_size: Optional[int] = None, read_chunk_size: Optional[int] = None, ) -> None: self.ssl_options = ssl_options - self._sockets: Dict[int, socket.socket] = {} - self._handlers: Dict[int, Callable[[], None]] = {} - self._pending_sockets: List[socket.socket] = [] + self._sockets: dict[int, socket.socket] = {} + self._handlers: dict[int, Callable[[], None]] = {} + self._pending_sockets: list[socket.socket] = [] self._started = False self._stopped = False self.max_buffer_size = max_buffer_size diff --git a/tornado/template.py b/tornado/template.py index 8e78afb8..1a7153ec 100644 --- a/tornado/template.py +++ b/tornado/template.py @@ -208,7 +208,7 @@ from tornado import escape from tornado.log import app_log from tornado.util import ObjectDict, exec_in, unicode_type -from typing import Any, Union, Callable, List, Dict, Iterable, Optional, TextIO +from typing import Any, Union, Callable, Iterable, Optional, TextIO import typing if typing.TYPE_CHECKING: @@ -365,7 +365,7 @@ class Template: buffer = StringIO() try: # named_blocks maps from names to _NamedBlock objects - named_blocks: Dict[str, _NamedBlock] = {} + named_blocks: dict[str, _NamedBlock] = {} ancestors = self._get_ancestors(loader) ancestors.reverse() for ancestor in ancestors: @@ -376,7 +376,7 @@ class Template: finally: buffer.close() - def _get_ancestors(self, loader: Optional["BaseLoader"]) -> List["_File"]: + def _get_ancestors(self, loader: Optional["BaseLoader"]) -> list["_File"]: ancestors = [self.file] for chunk in self.file.body.chunks: if isinstance(chunk, _ExtendsBlock): @@ -400,7 +400,7 @@ class BaseLoader: def __init__( self, autoescape: Optional[str] = _DEFAULT_AUTOESCAPE, - namespace: Optional[Dict[str, Any]] = None, + namespace: Optional[dict[str, Any]] = None, whitespace: Optional[str] = None, ) -> None: """Construct a template loader. @@ -421,7 +421,7 @@ class BaseLoader: self.autoescape = autoescape self.namespace = namespace or {} self.whitespace = whitespace - self.templates: Dict[str, Template] = {} + self.templates: dict[str, Template] = {} # self.lock protects self.templates. It's a reentrant lock # because templates may load other templates via `include` or # `extends`. Note that thanks to the GIL this code would be safe @@ -481,7 +481,7 @@ class Loader(BaseLoader): class DictLoader(BaseLoader): """A template loader that loads from a dictionary.""" - def __init__(self, dict: Dict[str, str], **kwargs: Any) -> None: + def __init__(self, dict: dict[str, str], **kwargs: Any) -> None: super().__init__(**kwargs) self.dict = dict @@ -508,7 +508,7 @@ class _Node: raise NotImplementedError() def find_named_blocks( - self, loader: Optional[BaseLoader], named_blocks: Dict[str, "_NamedBlock"] + self, loader: Optional[BaseLoader], named_blocks: dict[str, "_NamedBlock"] ) -> None: for child in self.each_child(): child.find_named_blocks(loader, named_blocks) @@ -533,7 +533,7 @@ class _File(_Node): class _ChunkList(_Node): - def __init__(self, chunks: List[_Node]) -> None: + def __init__(self, chunks: list[_Node]) -> None: self.chunks = chunks def generate(self, writer: "_CodeWriter") -> None: @@ -560,7 +560,7 @@ class _NamedBlock(_Node): block.body.generate(writer) def find_named_blocks( - self, loader: Optional[BaseLoader], named_blocks: Dict[str, "_NamedBlock"] + self, loader: Optional[BaseLoader], named_blocks: dict[str, "_NamedBlock"] ) -> None: named_blocks[self.name] = self _Node.find_named_blocks(self, loader, named_blocks) @@ -578,7 +578,7 @@ class _IncludeBlock(_Node): self.line = line def find_named_blocks( - self, loader: Optional[BaseLoader], named_blocks: Dict[str, _NamedBlock] + self, loader: Optional[BaseLoader], named_blocks: dict[str, _NamedBlock] ) -> None: assert loader is not None included = loader.load(self.name, self.template_name) @@ -724,7 +724,7 @@ class _CodeWriter: def __init__( self, file: TextIO, - named_blocks: Dict[str, _NamedBlock], + named_blocks: dict[str, _NamedBlock], loader: Optional[BaseLoader], current_template: Template, ) -> None: @@ -733,7 +733,7 @@ class _CodeWriter: self.loader = loader self.current_template = current_template self.apply_counter = 0 - self.include_stack: List[Tuple[Template, int]] = [] + self.include_stack: list[tuple[Template, int]] = [] self._indent = 0 def indent_size(self) -> int: diff --git a/tornado/test/circlerefs_test.py b/tornado/test/circlerefs_test.py index d5f7e969..4e2811a6 100644 --- a/tornado/test/circlerefs_test.py +++ b/tornado/test/circlerefs_test.py @@ -51,7 +51,7 @@ def find_circular_references(garbage): stack_ids.remove(item_id) visited_ids.add(item_id) - found: typing.List[object] = [] + found: list[object] = [] stack = [] stack_ids = set() garbage_ids = set(map(id, garbage)) diff --git a/tornado/test/escape_test.py b/tornado/test/escape_test.py index 5cf9ca8d..572add41 100644 --- a/tornado/test/escape_test.py +++ b/tornado/test/escape_test.py @@ -17,7 +17,7 @@ from tornado.util import unicode_type from typing import List, Tuple, Union, Dict, Any # noqa: F401 -linkify_tests: List[Tuple[Union[str, bytes], Dict[str, Any], str]] = [ +linkify_tests: list[tuple[Union[str, bytes], dict[str, Any], str]] = [ # (input, linkify_kwargs, expected_output) ( "hello http://world.com/!", @@ -218,7 +218,7 @@ class EscapeTestCase(unittest.TestCase): self.assertEqual(linked, html) def test_xhtml_escape(self): - tests: List[Tuple[Union[str, bytes], Union[str, bytes]]] = [ + tests: list[tuple[Union[str, bytes], Union[str, bytes]]] = [ ("", "<foo>"), ("", "<foo>"), (b"", b"<foo>"), @@ -245,7 +245,7 @@ class EscapeTestCase(unittest.TestCase): self.assertEqual(unescaped, xhtml_unescape(escaped)) def test_url_escape_unicode(self): - tests: List[Tuple[Union[str, bytes], str]] = [ + tests: list[tuple[Union[str, bytes], str]] = [ # byte strings are passed through as-is ("\u00e9".encode(), "%C3%A9"), ("\u00e9".encode("latin1"), "%E9"), diff --git a/tornado/test/gen_test.py b/tornado/test/gen_test.py index 6897cb91..b3906a67 100644 --- a/tornado/test/gen_test.py +++ b/tornado/test/gen_test.py @@ -659,7 +659,7 @@ class GenCoroutineUnfinishedSequenceHandler(RequestHandler): class UndecoratedCoroutinesHandler(RequestHandler): @gen.coroutine def prepare(self): - self.chunks: List[str] = [] + self.chunks: list[str] = [] yield gen.moment self.chunks.append("1") @@ -868,7 +868,7 @@ class WaitIteratorTest(AsyncTestCase): @gen_test def test_iterator(self): - futures: List[Future[int]] = [Future(), Future(), Future(), Future()] + futures: list[Future[int]] = [Future(), Future(), Future(), Future()] self.finish_coroutines(0, futures) @@ -897,7 +897,7 @@ class WaitIteratorTest(AsyncTestCase): # Recreate the previous test with py35 syntax. It's a little clunky # because of the way the previous test handles an exception on # a single iteration. - futures: List[Future[int]] = [Future(), Future(), Future(), Future()] + futures: list[Future[int]] = [Future(), Future(), Future(), Future()] self.finish_coroutines(0, futures) self.finished = False @@ -948,7 +948,7 @@ class RunnerGCTest(AsyncTestCase): def test_gc(self): # GitHub issue 1769: Runner objects can get GCed unexpectedly # while their future is alive. - weakref_scope: List[Optional[weakref.ReferenceType]] = [None] + weakref_scope: list[Optional[weakref.ReferenceType]] = [None] def callback(): gc.collect(2) @@ -968,7 +968,7 @@ class RunnerGCTest(AsyncTestCase): # their loop is closed, even if they're involved in a reference # cycle. loop = self.get_new_ioloop() - result: List[Optional[bool]] = [] + result: list[Optional[bool]] = [] wfut = [] @gen.coroutine @@ -1013,7 +1013,7 @@ class RunnerGCTest(AsyncTestCase): result.append(None) loop = self.get_new_ioloop() - result: List[Optional[bool]] = [] + result: list[Optional[bool]] = [] wfut = [] @gen.coroutine diff --git a/tornado/test/httpclient_test.py b/tornado/test/httpclient_test.py index f75ec055..e19db0a1 100644 --- a/tornado/test/httpclient_test.py +++ b/tornado/test/httpclient_test.py @@ -204,7 +204,7 @@ class HTTPClientCommonTestCase(AsyncHTTPTestCase): def test_streaming_callback(self): # streaming_callback is also tested in test_chunked - chunks: typing.List[bytes] = [] + chunks: list[bytes] = [] response = self.fetch("/hello", streaming_callback=chunks.append) # with streaming_callback, data goes to the callback and not response.body self.assertEqual(chunks, [b"Hello world!"]) @@ -219,7 +219,7 @@ class HTTPClientCommonTestCase(AsyncHTTPTestCase): response = self.fetch("/chunk") self.assertEqual(response.body, b"asdfqwer") - chunks: typing.List[bytes] = [] + chunks: list[bytes] = [] response = self.fetch("/chunk", streaming_callback=chunks.append) self.assertEqual(chunks, [b"asdf", b"qwer"]) self.assertFalse(response.body) diff --git a/tornado/test/httpserver_test.py b/tornado/test/httpserver_test.py index 9151e9ca..99777099 100644 --- a/tornado/test/httpserver_test.py +++ b/tornado/test/httpserver_test.py @@ -297,7 +297,7 @@ class EchoHandler(RequestHandler): class TypeCheckHandler(RequestHandler): def prepare(self): - self.errors: Dict[str, str] = {} + self.errors: dict[str, str] = {} fields = [ ("method", str), ("uri", str), @@ -1159,7 +1159,7 @@ class StreamingChunkSizeTest(AsyncHTTPTestCase): self.connection = connection def headers_received(self, start_line, headers): - self.chunk_lengths: List[int] = [] + self.chunk_lengths: list[int] = [] def data_received(self, chunk): self.chunk_lengths.append(len(chunk)) @@ -1310,7 +1310,7 @@ class IdleTimeoutTest(AsyncHTTPTestCase): def setUp(self): super().setUp() - self.streams: List[IOStream] = [] + self.streams: list[IOStream] = [] def tearDown(self): super().tearDown() diff --git a/tornado/test/httputil_test.py b/tornado/test/httputil_test.py index fd5044b9..2924b210 100644 --- a/tornado/test/httputil_test.py +++ b/tornado/test/httputil_test.py @@ -23,10 +23,8 @@ import time import urllib.parse import unittest -from typing import Tuple, Dict, List - -def form_data_args() -> Tuple[Dict[str, List[bytes]], Dict[str, List[HTTPFile]]]: +def form_data_args() -> tuple[dict[str, list[bytes]], dict[str, list[HTTPFile]]]: """Return two empty dicts suitable for use with parse_multipart_form_data. mypy insists on type annotations for dict literals, so this lets us avoid diff --git a/tornado/test/ioloop_test.py b/tornado/test/ioloop_test.py index 1ed2da2b..6555519f 100644 --- a/tornado/test/ioloop_test.py +++ b/tornado/test/ioloop_test.py @@ -225,7 +225,7 @@ class TestIOLoop(AsyncTestCase): def test_timeout_with_arguments(self): # This tests that all the timeout methods pass through *args correctly. - results: List[int] = [] + results: list[int] = [] self.io_loop.add_timeout(self.io_loop.time(), results.append, 1) self.io_loop.add_timeout(datetime.timedelta(seconds=0), results.append, 2) self.io_loop.call_at(self.io_loop.time(), results.append, 3) diff --git a/tornado/test/locks_test.py b/tornado/test/locks_test.py index 44c8c2e2..8dc6f8eb 100644 --- a/tornado/test/locks_test.py +++ b/tornado/test/locks_test.py @@ -23,7 +23,7 @@ from tornado.testing import gen_test, AsyncTestCase class ConditionTest(AsyncTestCase): def setUp(self): super().setUp() - self.history: typing.List[typing.Union[int, str]] = [] + self.history: list[typing.Union[int, str]] = [] def record_done(self, future, key): """Record the resolution of a Future returned by Condition.wait.""" diff --git a/tornado/test/netutil_test.py b/tornado/test/netutil_test.py index 8fb54061..78231d37 100644 --- a/tornado/test/netutil_test.py +++ b/tornado/test/netutil_test.py @@ -199,7 +199,7 @@ class TestPortAllocation(unittest.TestCase): not hasattr(socket, "SO_REUSEPORT"), "SO_REUSEPORT is not supported" ) def test_reuse_port(self): - sockets: typing.List[socket.socket] = [] + sockets: list[socket.socket] = [] sock, port = bind_unused_port(reuse_port=True) try: sockets = bind_sockets(port, "127.0.0.1", reuse_port=True) diff --git a/tornado/test/options_test.py b/tornado/test/options_test.py index 44fd30ab..998ef6ec 100644 --- a/tornado/test/options_test.py +++ b/tornado/test/options_test.py @@ -119,7 +119,7 @@ class OptionsTest(unittest.TestCase): options.foo = "2" def test_setattr_with_callback(self): - values: List[int] = [] + values: list[int] = [] options = OptionParser() options.define("foo", default=1, type=int, callback=values.append) options.foo = 2 diff --git a/tornado/test/routing_test.py b/tornado/test/routing_test.py index 13d08bc8..bf18eac4 100644 --- a/tornado/test/routing_test.py +++ b/tornado/test/routing_test.py @@ -57,7 +57,7 @@ class BasicRouterTestCase(AsyncHTTPTestCase): self.assertEqual(response.body, b"OK") -resources: typing.Dict[str, bytes] = {} +resources: dict[str, bytes] = {} class GetResource(RequestHandler): @@ -116,7 +116,7 @@ SecondHandler = _get_named_handler("second_handler") class CustomRouter(ReversibleRouter): def __init__(self): super().__init__() - self.routes: typing.Dict[str, typing.Any] = {} + self.routes: dict[str, typing.Any] = {} def add_routes(self, routes): self.routes.update(routes) diff --git a/tornado/test/simple_httpclient_test.py b/tornado/test/simple_httpclient_test.py index 697cf6d1..227d4c70 100644 --- a/tornado/test/simple_httpclient_test.py +++ b/tornado/test/simple_httpclient_test.py @@ -524,8 +524,8 @@ class SimpleHTTPClientTestMixin(AsyncTestCase): # simple_httpclient_test, but it fails with the version of libcurl # available on travis-ci. Move it when that has been upgraded # or we have a better framework to skip tests based on curl version. - headers: typing.List[str] = [] - chunk_bytes: typing.List[bytes] = [] + headers: list[str] = [] + chunk_bytes: list[bytes] = [] self.fetch( "/redirect?url=/hello", header_callback=headers.append, @@ -538,8 +538,8 @@ class SimpleHTTPClientTestMixin(AsyncTestCase): self.assertEqual(num_start_lines, 1) def test_streaming_callback_coroutine(self: typing.Any): - headers: typing.List[str] = [] - chunk_bytes: typing.List[bytes] = [] + headers: list[str] = [] + chunk_bytes: list[bytes] = [] import asyncio diff --git a/tornado/test/tcpclient_test.py b/tornado/test/tcpclient_test.py index 022fa481..1730a55f 100644 --- a/tornado/test/tcpclient_test.py +++ b/tornado/test/tcpclient_test.py @@ -40,7 +40,7 @@ AF1, AF2 = 1, 2 class TestTCPServer(TCPServer): def __init__(self, family): super().__init__() - self.streams: List[IOStream] = [] + self.streams: list[IOStream] = [] self.queue: Queue[IOStream] = Queue() sockets = bind_sockets(0, "localhost", family) self.add_sockets(sockets) @@ -203,10 +203,10 @@ class ConnectorTest(AsyncTestCase): def setUp(self): super().setUp() - self.connect_futures: Dict[ - Tuple[int, typing.Any], Future[ConnectorTest.FakeStream] + self.connect_futures: dict[ + tuple[int, typing.Any], Future[ConnectorTest.FakeStream] ] = {} - self.streams: Dict[typing.Any, ConnectorTest.FakeStream] = {} + self.streams: dict[typing.Any, ConnectorTest.FakeStream] = {} self.addrinfo = [(AF1, "a"), (AF1, "b"), (AF2, "c"), (AF2, "d")] def tearDown(self): diff --git a/tornado/test/tcpserver_test.py b/tornado/test/tcpserver_test.py index 173c51c5..6e35d20d 100644 --- a/tornado/test/tcpserver_test.py +++ b/tornado/test/tcpserver_test.py @@ -11,8 +11,6 @@ from tornado.tcpserver import TCPServer from tornado.test.util import skipIfNonUnix from tornado.testing import AsyncTestCase, ExpectLog, bind_unused_port, gen_test -from typing import Tuple - class TCPServerTest(AsyncTestCase): @gen_test @@ -123,7 +121,7 @@ class TestMultiprocess(unittest.TestCase): # processes, each of which prints its task id to stdout (a single # byte, so we don't have to worry about atomicity of the shared # stdout stream) and then exits. - def run_subproc(self, code: str) -> Tuple[str, str]: + def run_subproc(self, code: str) -> tuple[str, str]: try: result = subprocess.run( [sys.executable, "-Werror::DeprecationWarning"], diff --git a/tornado/test/util.py b/tornado/test/util.py index d8d2a973..ca7dcec5 100644 --- a/tornado/test/util.py +++ b/tornado/test/util.py @@ -11,7 +11,7 @@ import warnings from tornado.testing import bind_unused_port -_TestCaseType = typing.TypeVar("_TestCaseType", bound=typing.Type[unittest.TestCase]) +_TestCaseType = typing.TypeVar("_TestCaseType", bound=type[unittest.TestCase]) skipIfNonUnix = unittest.skipIf( os.name != "posix" or sys.platform == "cygwin", "non-unix platform" @@ -80,7 +80,7 @@ def exec_test(caller_globals, caller_locals, s): # globals: it's all global from the perspective of code defined # in s. global_namespace = dict(caller_globals, **caller_locals) # type: ignore - local_namespace: typing.Dict[str, typing.Any] = {} + local_namespace: dict[str, typing.Any] = {} exec(textwrap.dedent(s), global_namespace, local_namespace) return local_namespace diff --git a/tornado/test/util_test.py b/tornado/test/util_test.py index 1d71c039..bcc1ed3c 100644 --- a/tornado/test/util_test.py +++ b/tornado/test/util_test.py @@ -16,7 +16,7 @@ from tornado.util import ( re_unescape, ) -from typing import cast, Dict, Any +from typing import cast, Any class RaiseExcInfoTest(unittest.TestCase): @@ -249,7 +249,7 @@ class ArgReplacerTest(unittest.TestCase): def test_omitted(self): args = (1, 2) - kwargs: Dict[str, Any] = dict() + kwargs: dict[str, Any] = dict() self.assertIsNone(self.replacer.get_old_value(args, kwargs)) self.assertEqual( self.replacer.replace("new", args, kwargs), @@ -258,7 +258,7 @@ class ArgReplacerTest(unittest.TestCase): def test_position(self): args = (1, 2, "old", 3) - kwargs: Dict[str, Any] = dict() + kwargs: dict[str, Any] = dict() self.assertEqual(self.replacer.get_old_value(args, kwargs), "old") self.assertEqual( self.replacer.replace("new", args, kwargs), diff --git a/tornado/test/web_test.py b/tornado/test/web_test.py index 8e04f654..7936116a 100644 --- a/tornado/test/web_test.py +++ b/tornado/test/web_test.py @@ -103,7 +103,7 @@ class CookieTestRequestHandler(RequestHandler): # stub out enough methods to make the signed_cookie functions work def __init__(self, cookie_secret="0123456789", key_version=None): # don't call super.__init__ - self._cookies: typing.Dict[str, bytes] = {} + self._cookies: dict[str, bytes] = {} if key_version is None: self.application = ObjectDict( # type: ignore settings=dict(cookie_secret=cookie_secret) @@ -649,7 +649,7 @@ class RequestEncodingTest(WebTestCase): class TypeCheckHandler(RequestHandler): def prepare(self): - self.errors: typing.Dict[str, str] = {} + self.errors: dict[str, str] = {} self.check_type("status", self.get_status(), int) @@ -2522,7 +2522,7 @@ class BaseFlowControlHandler(RequestHandler): def initialize(self, test): self.test = test self.method = None - self.methods: typing.List[str] = [] + self.methods: list[str] = [] @contextlib.contextmanager def in_method(self, method): diff --git a/tornado/testing.py b/tornado/testing.py index 390dc0e2..e3ac8df5 100644 --- a/tornado/testing.py +++ b/tornado/testing.py @@ -34,12 +34,12 @@ from tornado.util import raise_exc_info, basestring_type from tornado.web import Application import typing -from typing import Tuple, Any, Callable, Type, Dict, Union, Optional, Coroutine +from typing import Any, Callable, Type, Union, Optional, Coroutine from types import TracebackType if typing.TYPE_CHECKING: - _ExcInfoTuple = Tuple[ - Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType] + _ExcInfoTuple = tuple[ + Optional[type[BaseException]], Optional[BaseException], Optional[TracebackType] ] @@ -48,7 +48,7 @@ _NON_OWNED_IOLOOPS = AsyncIOMainLoop def bind_unused_port( reuse_port: bool = False, address: str = "127.0.0.1" -) -> Tuple[socket.socket, int]: +) -> tuple[socket.socket, int]: """Binds a server socket to an available port on localhost. Returns a tuple (socket, port). @@ -222,7 +222,7 @@ class AsyncTestCase(unittest.TestCase): return IOLoop(make_current=False) def _handle_exception( - self, typ: Type[Exception], value: Exception, tb: TracebackType + self, typ: type[Exception], value: Exception, tb: TracebackType ) -> bool: if self.__failure is None: self.__failure = (typ, value, tb) @@ -448,7 +448,7 @@ class AsyncHTTPTestCase(AsyncTestCase): timeout=get_async_test_timeout(), ) - def get_httpserver_options(self) -> Dict[str, Any]: + def get_httpserver_options(self) -> dict[str, Any]: """May be overridden by subclasses to return additional keyword arguments for the server. """ @@ -488,10 +488,10 @@ class AsyncHTTPSTestCase(AsyncHTTPTestCase): def get_http_client(self) -> AsyncHTTPClient: return AsyncHTTPClient(force_instance=True, defaults=dict(validate_cert=False)) - def get_httpserver_options(self) -> Dict[str, Any]: + def get_httpserver_options(self) -> dict[str, Any]: return dict(ssl_options=self.get_ssl_options()) - def get_ssl_options(self) -> Dict[str, Any]: + def get_ssl_options(self) -> dict[str, Any]: """May be overridden by subclasses to select SSL options. By default includes a self-signed testing certificate. @@ -499,7 +499,7 @@ class AsyncHTTPSTestCase(AsyncHTTPTestCase): return AsyncHTTPSTestCase.default_ssl_options() @staticmethod - def default_ssl_options() -> Dict[str, Any]: + def default_ssl_options() -> dict[str, Any]: # Testing keys were generated with: # openssl req -new -keyout tornado/test/test.key \ # -out tornado/test/test.crt \ diff --git a/tornado/util.py b/tornado/util.py index 78a3276d..58595e6b 100644 --- a/tornado/util.py +++ b/tornado/util.py @@ -22,11 +22,9 @@ import zlib from typing import ( Any, - Dict, Mapping, Match, Callable, - Type, Sequence, ) @@ -51,7 +49,7 @@ basestring_type = str TimeoutError = asyncio.TimeoutError -class ObjectDict(Dict[str, Any]): +class ObjectDict(dict[str, Any]): """Makes a dictionary behave like an object, with attribute-style access.""" def __getattr__(self, name: str) -> Any: @@ -296,7 +294,7 @@ class Configurable: """ base = cls.configurable_base() if isinstance(impl, str): - impl = typing.cast(Type[Configurable], import_object(impl)) + impl = typing.cast(type[Configurable], import_object(impl)) if impl is not None and not issubclass(impl, cls): raise ValueError("Invalid subclass of %s" % cls) base.__impl_class = impl diff --git a/tornado/web.py b/tornado/web.py index d63cdcb0..2166b50e 100644 --- a/tornado/web.py +++ b/tornado/web.py @@ -107,13 +107,10 @@ from tornado.util import ObjectDict, unicode_type, _websocket_mask url = URLSpec from typing import ( - Dict, Any, Union, Optional, Awaitable, - Tuple, - List, Callable, Iterable, Generator, @@ -133,7 +130,7 @@ if typing.TYPE_CHECKING: # and related methods. _HeaderTypes = Union[bytes, unicode_type, int, numbers.Integral, datetime.datetime] -_CookieSecretTypes = Union[str, bytes, Dict[int, str], Dict[int, bytes]] +_CookieSecretTypes = Union[str, bytes, dict[int, str], dict[int, bytes]] MIN_SUPPORTED_SIGNED_VALUE_VERSION = 1 @@ -188,7 +185,7 @@ class RequestHandler: """ - SUPPORTED_METHODS: Tuple[str, ...] = ( + SUPPORTED_METHODS: tuple[str, ...] = ( "GET", "HEAD", "POST", @@ -198,16 +195,16 @@ class RequestHandler: "OPTIONS", ) - _template_loaders: Dict[str, template.BaseLoader] = {} + _template_loaders: dict[str, template.BaseLoader] = {} _template_loader_lock = threading.Lock() _remove_control_chars_regex = re.compile(r"[\x00-\x08\x0e-\x1f]") _stream_request_body = False # Will be set in _execute. - _transforms: List["OutputTransform"] - path_args: List[str] - path_kwargs: Dict[str, str] + _transforms: list["OutputTransform"] + path_args: list[str] + path_kwargs: dict[str, str] def __init__( self, @@ -265,7 +262,7 @@ class RequestHandler: """ @property - def settings(self) -> Dict[str, Any]: + def settings(self) -> dict[str, Any]: """An alias for `self.application.settings `.""" return self.application.settings @@ -340,7 +337,7 @@ class RequestHandler: } ) self.set_default_headers() - self._write_buffer: List[bytes] = [] + self._write_buffer: list[bytes] = [] self._status_code = 200 self._reason = httputil.responses[200] @@ -475,7 +472,7 @@ class RequestHandler: """ return self._get_argument(name, default, self.request.arguments, strip) - def get_arguments(self, name: str, strip: bool = True) -> List[str]: + def get_arguments(self, name: str, strip: bool = True) -> list[str]: """Returns a list of the arguments with the given name. If the argument is not present, returns an empty list. @@ -525,7 +522,7 @@ class RequestHandler: """ return self._get_argument(name, default, self.request.body_arguments, strip) - def get_body_arguments(self, name: str, strip: bool = True) -> List[str]: + def get_body_arguments(self, name: str, strip: bool = True) -> list[str]: """Returns a list of the body arguments with the given name. If the argument is not present, returns an empty list. @@ -569,7 +566,7 @@ class RequestHandler: """ return self._get_argument(name, default, self.request.query_arguments, strip) - def get_query_arguments(self, name: str, strip: bool = True) -> List[str]: + def get_query_arguments(self, name: str, strip: bool = True) -> list[str]: """Returns a list of the query arguments with the given name. If the argument is not present, returns an empty list. @@ -582,7 +579,7 @@ class RequestHandler: self, name: str, default: Union[None, str, _ArgDefaultMarker], - source: Dict[str, List[bytes]], + source: dict[str, list[bytes]], strip: bool = True, ) -> Optional[str]: args = self._get_arguments(name, source, strip=strip) @@ -593,8 +590,8 @@ class RequestHandler: return args[-1] def _get_arguments( - self, name: str, source: Dict[str, List[bytes]], strip: bool = True - ) -> List[str]: + self, name: str, source: dict[str, list[bytes]], strip: bool = True + ) -> list[str]: values = [] for v in source.get(name, []): s = self.decode_argument(v, name=name) @@ -628,7 +625,7 @@ class RequestHandler: ) @property - def cookies(self) -> Dict[str, http.cookies.Morsel]: + def cookies(self) -> dict[str, http.cookies.Morsel]: """An alias for `self.request.cookies <.httputil.HTTPServerRequest.cookies>`.""" return self.request.cookies @@ -659,7 +656,7 @@ class RequestHandler: name: str, value: Union[str, bytes], domain: Optional[str] = None, - expires: Optional[Union[float, Tuple, datetime.datetime]] = None, + expires: Optional[Union[float, tuple, datetime.datetime]] = None, path: str = "/", expires_days: Optional[float] = None, # Keyword-only args start here for historical reasons. @@ -1088,7 +1085,7 @@ class RequestHandler: Override this method in a sub-classed controller to change the output. """ paths = [] - unique_paths: Set[str] = set() + unique_paths: set[str] = set() for path in js_files: if not is_absolute(path): @@ -1123,7 +1120,7 @@ class RequestHandler: Override this method in a sub-classed controller to change the output. """ paths = [] - unique_paths: Set[str] = set() + unique_paths: set[str] = set() for path in css_files: if not is_absolute(path): @@ -1172,7 +1169,7 @@ class RequestHandler: namespace.update(kwargs) return t.generate(**namespace) - def get_template_namespace(self) -> Dict[str, Any]: + def get_template_namespace(self) -> dict[str, Any]: """Returns a dictionary to be used as the default template namespace. May be overridden by subclasses to add or modify values. @@ -1590,7 +1587,7 @@ class RequestHandler: self.set_cookie(cookie_name, self._xsrf_token, **cookie_kwargs) return self._xsrf_token - def _get_raw_xsrf_token(self) -> Tuple[Optional[int], bytes, float]: + def _get_raw_xsrf_token(self) -> tuple[Optional[int], bytes, float]: """Read or generate the xsrf token in its raw form. The raw_xsrf_token is a tuple containing: @@ -1619,7 +1616,7 @@ class RequestHandler: def _decode_xsrf_token( self, cookie: str - ) -> Tuple[Optional[int], Optional[bytes], Optional[float]]: + ) -> tuple[Optional[int], Optional[bytes], Optional[float]]: """Convert a cookie string into a the tuple form returned by _get_raw_xsrf_token. """ @@ -1825,7 +1822,7 @@ class RequestHandler: return match async def _execute( - self, transforms: List["OutputTransform"], *args: bytes, **kwargs: bytes + self, transforms: list["OutputTransform"], *args: bytes, **kwargs: bytes ) -> None: """Executes this request with the given output transforms.""" self._transforms = transforms @@ -1969,10 +1966,10 @@ class RequestHandler: exc_info=(typ, value, tb), # type: ignore ) - def _ui_module(self, name: str, module: Type["UIModule"]) -> Callable[..., str]: + def _ui_module(self, name: str, module: type["UIModule"]) -> Callable[..., str]: def render(*args, **kwargs) -> str: # type: ignore if not hasattr(self, "_active_modules"): - self._active_modules: Dict[str, UIModule] = {} + self._active_modules: dict[str, UIModule] = {} if name not in self._active_modules: self._active_modules[name] = module(self) rendered = self._active_modules[name].render(*args, **kwargs) @@ -1997,7 +1994,7 @@ class RequestHandler: _RequestHandlerType = TypeVar("_RequestHandlerType", bound=RequestHandler) -def stream_request_body(cls: Type[_RequestHandlerType]) -> Type[_RequestHandlerType]: +def stream_request_body(cls: type[_RequestHandlerType]) -> type[_RequestHandlerType]: """Apply to `RequestHandler` subclasses to enable streaming body support. This decorator implies the following changes: @@ -2024,7 +2021,7 @@ def stream_request_body(cls: Type[_RequestHandlerType]) -> Type[_RequestHandlerT return cls -def _has_stream_request_body(cls: Type[RequestHandler]) -> bool: +def _has_stream_request_body(cls: type[RequestHandler]) -> bool: if not issubclass(cls, RequestHandler): raise TypeError("expected subclass of RequestHandler, got %r", cls) return cls._stream_request_body @@ -2212,11 +2209,11 @@ class Application(ReversibleRouter): self, handlers: Optional[_RuleList] = None, default_host: Optional[str] = None, - transforms: Optional[List[Type["OutputTransform"]]] = None, + transforms: Optional[list[type["OutputTransform"]]] = None, **settings: Any, ) -> None: if transforms is None: - self.transforms: List[Type[OutputTransform]] = [] + self.transforms: list[type[OutputTransform]] = [] if settings.get("compress_response") or settings.get("gzip"): self.transforms.append(GZipContentEncoding) else: @@ -2228,7 +2225,7 @@ class Application(ReversibleRouter): "xsrf_form_html": _xsrf_form_html, "Template": TemplateModule, } - self.ui_methods: Dict[str, Callable[..., str]] = {} + self.ui_methods: dict[str, Callable[..., str]] = {} self._load_ui_modules(settings.get("ui_modules", {})) self._load_ui_methods(settings.get("ui_methods", {})) if self.settings.get("static_path"): @@ -2324,7 +2321,7 @@ class Application(ReversibleRouter): [(DefaultHostMatches(self, host_matcher.host_pattern), host_handlers)] ) - def add_transform(self, transform_class: Type["OutputTransform"]) -> None: + def add_transform(self, transform_class: type["OutputTransform"]) -> None: self.transforms.append(transform_class) def _load_ui_methods(self, methods: Any) -> None: @@ -2383,10 +2380,10 @@ class Application(ReversibleRouter): def get_handler_delegate( self, request: httputil.HTTPServerRequest, - target_class: Type[RequestHandler], - target_kwargs: Optional[Dict[str, Any]] = None, - path_args: Optional[List[bytes]] = None, - path_kwargs: Optional[Dict[str, bytes]] = None, + target_class: type[RequestHandler], + target_kwargs: Optional[dict[str, Any]] = None, + path_args: Optional[list[bytes]] = None, + path_kwargs: Optional[dict[str, bytes]] = None, ) -> "_HandlerDelegate": """Returns `~.httputil.HTTPMessageDelegate` that can serve a request for application and `RequestHandler` subclass. @@ -2448,10 +2445,10 @@ class _HandlerDelegate(httputil.HTTPMessageDelegate): self, application: Application, request: httputil.HTTPServerRequest, - handler_class: Type[RequestHandler], - handler_kwargs: Optional[Dict[str, Any]], - path_args: Optional[List[bytes]], - path_kwargs: Optional[Dict[str, bytes]], + handler_class: type[RequestHandler], + handler_kwargs: Optional[dict[str, Any]], + path_args: Optional[list[bytes]], + path_kwargs: Optional[dict[str, bytes]], ) -> None: self.application = application self.connection = request.connection @@ -2460,7 +2457,7 @@ class _HandlerDelegate(httputil.HTTPMessageDelegate): self.handler_kwargs = handler_kwargs or {} self.path_args = path_args or [] self.path_kwargs = path_kwargs or {} - self.chunks: List[bytes] = [] + self.chunks: list[bytes] = [] self.stream_request_body = _has_stream_request_body(self.handler_class) def headers_received( @@ -2771,7 +2768,7 @@ class StaticFileHandler(RequestHandler): CACHE_MAX_AGE = 86400 * 365 * 10 # 10 years - _static_hashes: Dict[str, Optional[str]] = {} + _static_hashes: dict[str, Optional[str]] = {} _lock = threading.Lock() # protects _static_hashes def initialize(self, path: str, default_filename: Optional[str] = None) -> None: @@ -3156,7 +3153,7 @@ class StaticFileHandler(RequestHandler): @classmethod def make_static_url( - cls, settings: Dict[str, Any], path: str, include_version: bool = True + cls, settings: dict[str, Any], path: str, include_version: bool = True ) -> str: """Constructs a versioned url for the given path. @@ -3200,7 +3197,7 @@ class StaticFileHandler(RequestHandler): return url_path @classmethod - def get_version(cls, settings: Dict[str, Any], path: str) -> Optional[str]: + def get_version(cls, settings: dict[str, Any], path: str) -> Optional[str]: """Generate the version string to be used in static URLs. ``settings`` is the `Application.settings` dictionary and ``path`` @@ -3277,7 +3274,7 @@ class OutputTransform: headers: httputil.HTTPHeaders, chunk: bytes, finishing: bool, - ) -> Tuple[int, httputil.HTTPHeaders, bytes]: + ) -> tuple[int, httputil.HTTPHeaders, bytes]: return status_code, headers, chunk def transform_chunk(self, chunk: bytes, finishing: bool) -> bytes: @@ -3329,7 +3326,7 @@ class GZipContentEncoding(OutputTransform): headers: httputil.HTTPHeaders, chunk: bytes, finishing: bool, - ) -> Tuple[int, httputil.HTTPHeaders, bytes]: + ) -> tuple[int, httputil.HTTPHeaders, bytes]: # TODO: can/should this type be inherited from the superclass? if "Vary" in headers: headers["Vary"] += ", Accept-Encoding" @@ -3506,8 +3503,8 @@ class TemplateModule(UIModule): def __init__(self, handler: RequestHandler) -> None: super().__init__(handler) # keep resources in both a list and a dict to preserve order - self._resource_list: List[Dict[str, Any]] = [] - self._resource_dict: Dict[str, Dict[str, Any]] = {} + self._resource_list: list[dict[str, Any]] = [] + self._resource_dict: dict[str, dict[str, Any]] = {} def render(self, path: str, **kwargs: Any) -> bytes: def set_resources(**kwargs) -> str: # type: ignore @@ -3562,7 +3559,7 @@ class _UIModuleNamespace: """Lazy namespace which creates UIModule proxies bound to a handler.""" def __init__( - self, handler: RequestHandler, ui_modules: Dict[str, Type[UIModule]] + self, handler: RequestHandler, ui_modules: dict[str, type[UIModule]] ) -> None: self.handler = handler self.ui_modules = ui_modules @@ -3733,8 +3730,8 @@ def _decode_signed_value_v1( return None -def _decode_fields_v2(value: bytes) -> Tuple[int, bytes, bytes, bytes, bytes]: - def _consume_field(s: bytes) -> Tuple[bytes, bytes]: +def _decode_fields_v2(value: bytes) -> tuple[int, bytes, bytes, bytes, bytes]: + def _consume_field(s: bytes) -> tuple[bytes, bytes]: length, _, rest = s.partition(b":") n = int(length) field_value = rest[:n] diff --git a/tornado/websocket.py b/tornado/websocket.py index dd3aef82..b732b73b 100644 --- a/tornado/websocket.py +++ b/tornado/websocket.py @@ -42,12 +42,9 @@ from typing import ( cast, Any, Optional, - Dict, Union, - List, Awaitable, Callable, - Tuple, Type, ) from types import TracebackType @@ -92,7 +89,7 @@ if TYPE_CHECKING: def log_exception( self, - typ: Optional[Type[BaseException]], + typ: Optional[type[BaseException]], value: Optional[BaseException], tb: Optional[TracebackType], ) -> None: @@ -128,7 +125,7 @@ class _WebSocketParams: ping_interval: Optional[float] = None, ping_timeout: Optional[float] = None, max_message_size: int = _default_max_message_size, - compression_options: Optional[Dict[str, Any]] = None, + compression_options: Optional[dict[str, Any]] = None, ) -> None: self.ping_interval = ping_interval self.ping_timeout = ping_timeout @@ -335,7 +332,7 @@ class WebSocketHandler(tornado.web.RequestHandler): ) def write_message( - self, message: Union[bytes, str, Dict[str, Any]], binary: bool = False + self, message: Union[bytes, str, dict[str, Any]], binary: bool = False ) -> "Future[None]": """Sends the given message to the client of this Web Socket. @@ -364,7 +361,7 @@ class WebSocketHandler(tornado.web.RequestHandler): message = tornado.escape.json_encode(message) return self.ws_connection.write_message(message, binary=binary) - def select_subprotocol(self, subprotocols: List[str]) -> Optional[str]: + def select_subprotocol(self, subprotocols: list[str]) -> Optional[str]: """Override to implement subprotocol negotiation. ``subprotocols`` is a list of strings identifying the @@ -398,7 +395,7 @@ class WebSocketHandler(tornado.web.RequestHandler): assert self.ws_connection is not None return self.ws_connection.selected_subprotocol - def get_compression_options(self) -> Optional[Dict[str, Any]]: + def get_compression_options(self) -> Optional[dict[str, Any]]: """Override to return compression options for the connection. If this method returns None (the default), compression will @@ -697,7 +694,7 @@ class WebSocketProtocol(abc.ABC): @abc.abstractmethod def write_message( - self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False + self, message: Union[str, bytes, dict[str, Any]], binary: bool = False ) -> "Future[None]": raise NotImplementedError() @@ -738,7 +735,7 @@ class _PerMessageDeflateCompressor: self, persistent: bool, max_wbits: Optional[int], - compression_options: Optional[Dict[str, Any]] = None, + compression_options: Optional[dict[str, Any]] = None, ) -> None: if max_wbits is None: max_wbits = zlib.MAX_WBITS @@ -787,7 +784,7 @@ class _PerMessageDeflateDecompressor: persistent: bool, max_wbits: Optional[int], max_message_size: int, - compression_options: Optional[Dict[str, Any]] = None, + compression_options: Optional[dict[str, Any]] = None, ) -> None: self._max_message_size = max_message_size if max_wbits is None: @@ -976,7 +973,7 @@ class WebSocketProtocol13(WebSocketProtocol): def _parse_extensions_header( self, headers: httputil.HTTPHeaders - ) -> List[Tuple[str, Dict[str, str]]]: + ) -> list[tuple[str, dict[str, str]]]: extensions = headers.get("Sec-WebSocket-Extensions", "") if extensions: return [httputil._parse_header(e.strip()) for e in extensions.split(",")] @@ -1006,13 +1003,13 @@ class WebSocketProtocol13(WebSocketProtocol): def _get_compressor_options( self, side: str, - agreed_parameters: Dict[str, Any], - compression_options: Optional[Dict[str, Any]] = None, - ) -> Dict[str, Any]: + agreed_parameters: dict[str, Any], + compression_options: Optional[dict[str, Any]] = None, + ) -> dict[str, Any]: """Converts a websocket agreed_parameters set to keyword arguments for our compressor objects. """ - options: Dict[str, Any] = dict( + options: dict[str, Any] = dict( persistent=(side + "_no_context_takeover") not in agreed_parameters ) wbits_header = agreed_parameters.get(side + "_max_window_bits", None) @@ -1026,8 +1023,8 @@ class WebSocketProtocol13(WebSocketProtocol): def _create_compressors( self, side: str, - agreed_parameters: Dict[str, Any], - compression_options: Optional[Dict[str, Any]] = None, + agreed_parameters: dict[str, Any], + compression_options: Optional[dict[str, Any]] = None, ) -> None: # TODO: handle invalid parameters gracefully allowed_keys = { @@ -1084,7 +1081,7 @@ class WebSocketProtocol13(WebSocketProtocol): return self.stream.write(frame) def write_message( - self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False + self, message: Union[str, bytes, dict[str, Any]], binary: bool = False ) -> "Future[None]": """Sends the given message to the client of this Web Socket.""" if binary: @@ -1400,11 +1397,11 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): self, request: httpclient.HTTPRequest, on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]] = None, - compression_options: Optional[Dict[str, Any]] = None, + compression_options: Optional[dict[str, Any]] = None, ping_interval: Optional[float] = None, ping_timeout: Optional[float] = None, max_message_size: int = _default_max_message_size, - subprotocols: Optional[List[str]] = None, + subprotocols: Optional[list[str]] = None, resolver: Optional[Resolver] = None, ) -> None: self.connect_future: Future[WebSocketClientConnection] = Future() @@ -1537,7 +1534,7 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): future_set_result_unless_cancelled(self.connect_future, self) def write_message( - self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False + self, message: Union[str, bytes, dict[str, Any]], binary: bool = False ) -> "Future[None]": """Sends a message to the WebSocket server. @@ -1637,11 +1634,11 @@ def websocket_connect( callback: Optional[Callable[["Future[WebSocketClientConnection]"], None]] = None, connect_timeout: Optional[float] = None, on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]] = None, - compression_options: Optional[Dict[str, Any]] = None, + compression_options: Optional[dict[str, Any]] = None, ping_interval: Optional[float] = None, ping_timeout: Optional[float] = None, max_message_size: int = _default_max_message_size, - subprotocols: Optional[List[str]] = None, + subprotocols: Optional[list[str]] = None, resolver: Optional[Resolver] = None, ) -> "Awaitable[WebSocketClientConnection]": """Client-side websocket support. diff --git a/tornado/wsgi.py b/tornado/wsgi.py index 8af8607a..f0837b18 100644 --- a/tornado/wsgi.py +++ b/tornado/wsgi.py @@ -38,7 +38,7 @@ from tornado import httputil from tornado.ioloop import IOLoop from tornado.log import access_log -from typing import List, Tuple, Optional, Callable, Any, Dict +from typing import Optional, Callable, Any from types import TracebackType import typing @@ -134,14 +134,14 @@ class WSGIContainer: IOLoop.current().spawn_callback(self.handle_request, request) async def handle_request(self, request: httputil.HTTPServerRequest) -> None: - data: Dict[str, Any] = {} - response: List[bytes] = [] + data: dict[str, Any] = {} + response: list[bytes] = [] def start_response( status: str, - headers: List[Tuple[str, str]], + headers: list[tuple[str, str]], exc_info: Optional[ - Tuple[ + tuple[ "Optional[Type[BaseException]]", Optional[BaseException], Optional[TracebackType], @@ -184,7 +184,7 @@ class WSGIContainer: status_code_str, reason = data["status"].split(" ", 1) status_code = int(status_code_str) - headers: List[Tuple[str, str]] = data["headers"] + headers: list[tuple[str, str]] = data["headers"] header_set = {k.lower() for (k, v) in headers} body = escape.utf8(body) if status_code != 304: @@ -204,7 +204,7 @@ class WSGIContainer: request.connection.finish() self._log(status_code, request) - def environ(self, request: httputil.HTTPServerRequest) -> Dict[str, Any]: + def environ(self, request: httputil.HTTPServerRequest) -> dict[str, Any]: """Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment. .. versionchanged:: 6.3