from tornado.util import unicode_type
from tornado.web import RequestHandler
-from typing import Any, cast, Union, Optional
+from typing import Any, cast
from collections.abc import Iterable
def authenticate_redirect(
self,
- callback_uri: Optional[str] = None,
+ callback_uri: str | None = None,
ax_attrs: list[str] = ["name", "email", "language", "username"],
) -> None:
"""Redirects to the authentication URL for this service.
handler.redirect(endpoint + "?" + urllib.parse.urlencode(args))
async def get_authenticated_user(
- self, http_client: Optional[httpclient.AsyncHTTPClient] = None
+ self, http_client: httpclient.AsyncHTTPClient | None = None
) -> dict[str, Any]:
"""Fetches the authenticated user data upon redirect.
"""
handler = cast(RequestHandler, self)
# Verify the OpenID response via direct request to the OP
- args: dict[str, Union[str, bytes]] = {
+ args: dict[str, str | bytes] = {
k: v[-1] for k, v in handler.request.arguments.items()
}
args["openid.mode"] = "check_authentication"
self,
callback_uri: str,
ax_attrs: Iterable[str] = [],
- oauth_scope: Optional[str] = None,
+ oauth_scope: str | None = None,
) -> dict[str, str]:
handler = cast(RequestHandler, self)
url = urllib.parse.urljoin(handler.request.full_url(), callback_uri)
async def authorize_redirect(
self,
- callback_uri: Optional[str] = None,
- extra_params: Optional[dict[str, Any]] = None,
- http_client: Optional[httpclient.AsyncHTTPClient] = None,
+ callback_uri: str | None = None,
+ extra_params: dict[str, Any] | None = None,
+ http_client: httpclient.AsyncHTTPClient | None = None,
) -> None:
"""Redirects the user to obtain OAuth authorization for this service.
self._on_request_token(url, callback_uri, response)
async def get_authenticated_user(
- self, http_client: Optional[httpclient.AsyncHTTPClient] = None
+ self, http_client: httpclient.AsyncHTTPClient | None = None
) -> dict[str, Any]:
"""Gets the OAuth authorized user and access token.
)
if cookie_key != request_key:
raise AuthError("Request token does not match cookie")
- token: dict[str, Union[str, bytes]] = dict(key=cookie_key, secret=cookie_secret)
+ token: dict[str, str | bytes] = dict(key=cookie_key, secret=cookie_secret)
if oauth_verifier:
token["verifier"] = oauth_verifier
if http_client is None:
def _oauth_request_token_url(
self,
- callback_uri: Optional[str] = None,
- extra_params: Optional[dict[str, Any]] = None,
+ callback_uri: str | None = None,
+ extra_params: dict[str, Any] | None = None,
) -> str:
handler = cast(RequestHandler, self)
consumer_token = self._oauth_consumer_token()
def _on_request_token(
self,
authorize_url: str,
- callback_uri: Optional[str],
+ callback_uri: str | None,
response: httpclient.HTTPResponse,
) -> None:
handler = cast(RequestHandler, self)
def authorize_redirect(
self,
- redirect_uri: Optional[str] = None,
- client_id: Optional[str] = None,
- client_secret: Optional[str] = None,
- extra_params: Optional[dict[str, Any]] = None,
- scope: Optional[list[str]] = None,
+ redirect_uri: str | None = None,
+ client_id: str | None = None,
+ client_secret: str | None = None,
+ extra_params: dict[str, Any] | None = None,
+ scope: list[str] | None = None,
response_type: str = "code",
) -> None:
"""Redirects the user to obtain OAuth authorization for this service.
def _oauth_request_token_url(
self,
- redirect_uri: Optional[str] = None,
- client_id: Optional[str] = None,
- client_secret: Optional[str] = None,
- code: Optional[str] = None,
- extra_params: Optional[dict[str, Any]] = None,
+ redirect_uri: str | None = None,
+ client_id: str | None = None,
+ client_secret: str | None = None,
+ code: str | None = None,
+ extra_params: dict[str, Any] | None = None,
) -> str:
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
args: dict[str, str] = {}
async def oauth2_request(
self,
url: str,
- access_token: Optional[str] = None,
- post_args: Optional[dict[str, Any]] = None,
+ access_token: str | None = None,
+ post_args: dict[str, Any] | None = None,
**args: Any,
) -> Any:
"""Fetches the given URL auth an OAuth2 access token.
_OAUTH_NO_CALLBACKS = False
_TWITTER_BASE_URL = "https://api.twitter.com/1.1"
- async def authenticate_redirect(self, callback_uri: Optional[str] = None) -> None:
+ async def authenticate_redirect(self, callback_uri: str | None = None) -> None:
"""Just like `~OAuthMixin.authorize_redirect`, but
auto-redirects if authorized.
self,
path: str,
access_token: dict[str, Any],
- post_args: Optional[dict[str, Any]] = None,
+ post_args: dict[str, Any] | None = None,
**args: Any,
) -> Any:
"""Fetches the given API path, e.g., ``statuses/user_timeline/btaylor``
self,
redirect_uri: str,
code: str,
- client_id: Optional[str] = None,
- client_secret: Optional[str] = None,
+ client_id: str | None = None,
+ client_secret: str | None = None,
) -> dict[str, Any]:
"""Handles the login for the Google user, returning an access token.
client_id: str,
client_secret: str,
code: str,
- extra_fields: Optional[dict[str, Any]] = None,
- ) -> Optional[dict[str, Any]]:
+ extra_fields: dict[str, Any] | None = None,
+ ) -> dict[str, Any] | None:
"""Handles the login for the Facebook user, returning a user object.
Example usage:
async def facebook_request(
self,
path: str,
- access_token: Optional[str] = None,
- post_args: Optional[dict[str, Any]] = None,
+ access_token: str | None = None,
+ post_args: dict[str, Any] | None = None,
**args: Any,
) -> Any:
"""Fetches the given relative API path, e.g., "/btaylor/picture"
method: str,
url: str,
parameters: dict[str, Any] = {},
- token: Optional[dict[str, Any]] = None,
+ token: dict[str, Any] | None = None,
) -> bytes:
"""Calculates the HMAC-SHA1 OAuth signature for the given request.
method: str,
url: str,
parameters: dict[str, Any] = {},
- token: Optional[dict[str, Any]] = None,
+ token: dict[str, Any] | None = None,
) -> bytes:
"""Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request.
return binascii.b2a_base64(hash.digest())[:-1]
-def _oauth_escape(val: Union[str, bytes]) -> str:
+def _oauth_escape(val: str | bytes) -> str:
if isinstance(val, unicode_type):
val = val.encode("utf-8")
return urllib.parse.quote(val, safe="~")
except ImportError:
signal = None # type: ignore
-from typing import Callable, Optional, Union
+from typing import Callable
# os.execv is broken on Windows and can't properly parse command line
# arguments and executable name if they contain whitespaces. subprocess
weakref.WeakKeyDictionary()
)
_autoreload_is_main = False
-_original_argv: Optional[list[str]] = None
+_original_argv: list[str] | None = None
_original_spec = None
# SystemExit.code is typed funny: https://github.com/python/typeshed/issues/8513
# All we care about is truthiness
- exit_status: Union[int, str, None] = 1
+ exit_status: int | str | None = 1
try:
import runpy
from tornado.log import app_log
import typing
-from typing import Any, Callable, Optional, Union
+from typing import Any, Callable, Union
_T = typing.TypeVar("_T")
def future_set_exc_info(
future: "Union[futures.Future[_T], Future[_T]]",
- exc_info: tuple[
- Optional[type], Optional[BaseException], Optional[types.TracebackType]
- ],
+ exc_info: tuple[type | None, BaseException | None, types.TracebackType | None],
) -> None:
"""Set the given ``exc_info`` as the `Future`'s exception.
)
from tornado.log import app_log
-from typing import Any, Callable, Union, Optional
+from typing import Any, Callable
import typing
if typing.TYPE_CHECKING:
class CurlAsyncHTTPClient(AsyncHTTPClient):
def initialize( # type: ignore
- self, max_clients: int = 10, defaults: Optional[dict[str, Any]] = None
+ self, max_clients: int = 10, defaults: dict[str, Any] | None = None
) -> None:
super().initialize(defaults=defaults)
# Typeshed is incomplete for CurlMulti, so just use Any for now.
tuple[HTTPRequest, Callable[[HTTPResponse], None], float]
] = collections.deque()
self._fds: dict[int, int] = {}
- self._timeout: Optional[object] = None
+ self._timeout: object | None = None
# libcurl has bugs that sometimes cause it to not report all
# relevant file descriptors and timeouts to TIMERFUNCTION/
def _finish(
self,
curl: pycurl.Curl,
- curl_error: Optional[int] = None,
- curl_message: Optional[str] = None,
+ curl_error: int | None = None,
+ curl_message: str | None = None,
) -> None:
info = curl.info # type: ignore
curl.info = None # type: ignore
buffer = info["buffer"]
if curl_error:
assert curl_message is not None
- error: Optional[CurlError] = CurlError(curl_error, curl_message)
+ error: CurlError | None = CurlError(curl_error, curl_message)
assert error is not None
code = error.code
effective_url = None
"'CurlAsyncHTTPClient' does not support async streaming_callbacks."
)
- def write_function(b: Union[bytes, bytearray]) -> int:
+ def write_function(b: bytes | bytearray) -> int:
assert request.streaming_callback is not None
self.io_loop.add_callback(request.streaming_callback, b)
return len(b)
def _curl_header_callback(
self,
headers: httputil.HTTPHeaders,
- header_callback: Optional[Callable[[str], None]],
+ header_callback: Callable[[str], None] | None,
header_line_bytes: bytes,
) -> None:
header_line = native_str(header_line_bytes.decode("latin1"))
from tornado.util import unicode_type
import typing
-from typing import Union, Any, Optional, Callable
+from typing import Any, Callable
-def xhtml_escape(value: Union[str, bytes]) -> str:
+def xhtml_escape(value: str | bytes) -> str:
"""Escapes a string so it is valid within HTML or XML.
Escapes the characters ``<``, ``>``, ``"``, ``'``, and ``&``.
return html.escape(to_unicode(value))
-def xhtml_unescape(value: Union[str, bytes]) -> str:
+def xhtml_unescape(value: str | bytes) -> str:
"""Un-escapes an XML-escaped string.
Equivalent to `html.unescape` except that this function always returns
return json.dumps(value).replace("</", "<\\/")
-def json_decode(value: Union[str, bytes]) -> Any:
+def json_decode(value: str | bytes) -> Any:
"""Returns Python objects for the given JSON string.
Supports both `str` and `bytes` inputs. Equvalent to `json.loads`.
return re.sub(r"[\x00-\x20]+", " ", value).strip()
-def url_escape(value: Union[str, bytes], plus: bool = True) -> str:
+def url_escape(value: str | bytes, plus: bool = True) -> str:
"""Returns a URL-encoded version of the given value.
Equivalent to either `urllib.parse.quote_plus` or `urllib.parse.quote` depending on the ``plus``
@typing.overload
-def url_unescape(value: Union[str, bytes], encoding: None, plus: bool = True) -> bytes:
+def url_unescape(value: str | bytes, encoding: None, plus: bool = True) -> bytes:
pass
@typing.overload
-def url_unescape(
- value: Union[str, bytes], encoding: str = "utf-8", plus: bool = True
-) -> str:
+def url_unescape(value: str | bytes, encoding: str = "utf-8", plus: bool = True) -> str:
pass
def url_unescape(
- value: Union[str, bytes], encoding: Optional[str] = "utf-8", plus: bool = True
-) -> Union[str, bytes]:
+ value: str | bytes, encoding: str | None = "utf-8", plus: bool = True
+) -> str | bytes:
"""Decodes the given value from a URL.
The argument may be either a byte or unicode string.
def parse_qs_bytes(
- qs: Union[str, bytes], keep_blank_values: bool = False, strict_parsing: bool = False
+ qs: str | bytes, keep_blank_values: bool = False, strict_parsing: bool = False
) -> dict[str, list[bytes]]:
"""Parses a query string like urlparse.parse_qs,
but takes bytes and returns the values as byte strings.
pass
-def utf8(value: Union[None, str, bytes]) -> Optional[bytes]:
+def utf8(value: None | str | bytes) -> bytes | None:
"""Converts a string argument to a byte string.
If the argument is already a byte string or None, it is returned unchanged.
pass
-def to_unicode(value: Union[None, str, bytes]) -> Optional[str]:
+def to_unicode(value: None | str | bytes) -> str | None:
"""Converts a string argument to a unicode string.
If the argument is already a unicode string or None, it is returned
def linkify(
- text: Union[str, bytes],
+ text: str | bytes,
shorten: bool = False,
- extra_params: Union[str, Callable[[str], str]] = "",
+ extra_params: str | Callable[[str], str] = "",
require_protocol: bool = False,
permitted_protocols: list[str] = ["http", "https"],
) -> str:
def coroutine(
- func: Union[Callable[..., "Generator[Any, Any, _T]"], Callable[..., _T]],
+ func: Callable[..., "Generator[Any, Any, _T]"] | Callable[..., _T],
) -> Callable[..., "Future[_T]"]:
"""Decorator for asynchronous generators.
"""
- _unfinished: dict[Future, Union[int, str]] = {}
+ _unfinished: dict[Future, int | str] = {}
def __init__(self, *args: Future, **kwargs: Future) -> None:
if args and kwargs:
futures = args
self._finished: Deque[Future] = collections.deque()
- self.current_index: Optional[Union[str, int]] = None
- self.current_future: Optional[Future] = None
- self._running_future: Optional[Future] = None
+ self.current_index: str | int | None = None
+ self.current_future: Future | None = None
+ self._running_future: Future | None = None
for future in futures:
future_add_done_callback(future, self._done_callback)
@overload
def multi(
children: Sequence[_Yieldable],
- quiet_exceptions: Union[type[Exception], tuple[type[Exception], ...]] = (),
+ quiet_exceptions: type[Exception] | tuple[type[Exception], ...] = (),
) -> Future[list]: ...
@overload
def multi(
children: Mapping[Any, _Yieldable],
- quiet_exceptions: Union[type[Exception], tuple[type[Exception], ...]] = (),
+ quiet_exceptions: type[Exception] | tuple[type[Exception], ...] = (),
) -> Future[dict]: ...
def multi(
- children: Union[Sequence[_Yieldable], Mapping[Any, _Yieldable]],
+ children: Sequence[_Yieldable] | Mapping[Any, _Yieldable],
quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
) -> "Union[Future[List], Future[Dict]]":
"""Runs multiple asynchronous operations in parallel.
def multi_future(
- children: Union[Sequence[_Yieldable], Mapping[Any, _Yieldable]],
+ children: Sequence[_Yieldable] | Mapping[Any, _Yieldable],
quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
) -> "Union[Future[List], Future[Dict]]":
"""Wait for multiple asynchronous futures in parallel.
Use `multi` instead.
"""
if isinstance(children, dict):
- keys: Optional[list] = list(children.keys())
+ keys: list | None = list(children.keys())
children_seq: Iterable = children.values()
else:
keys = None
def with_timeout(
- timeout: Union[float, datetime.timedelta],
+ timeout: float | datetime.timedelta,
future: _Yieldable,
quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
) -> Future:
self.ctx_run = ctx_run
self.gen = gen
self.result_future = result_future
- self.future: Union[None, Future] = _null_future
+ self.future: None | Future = _null_future
self.running = False
self.finished = False
self.io_loop = IOLoop.current()
# Save the exception for later. It's important that
# gen.throw() not be called inside this try/except block
# because that makes sys.exc_info behave unexpectedly.
- exc: Optional[Exception] = e
+ exc: Exception | None = e
else:
exc = None
finally:
from tornado.util import GzipDecompressor
-from typing import cast, Optional, Type, Callable, Union
+from typing import cast, Optional, Type, Callable
from collections.abc import Awaitable
CR_OR_LF_RE = re.compile(b"\r|\n")
def __exit__(
self,
typ: "Optional[Type[BaseException]]",
- value: Optional[BaseException],
+ value: BaseException | None,
tb: types.TracebackType,
) -> None:
if value is not None:
def __init__(
self,
no_keep_alive: bool = False,
- chunk_size: Optional[int] = None,
- max_header_size: Optional[int] = None,
- header_timeout: Optional[float] = None,
- max_body_size: Optional[int] = None,
- body_timeout: Optional[float] = None,
+ chunk_size: int | None = None,
+ max_header_size: int | None = None,
+ header_timeout: float | None = None,
+ max_body_size: int | None = None,
+ body_timeout: float | None = None,
decompress: bool = False,
) -> None:
"""
self,
stream: iostream.IOStream,
is_client: bool,
- params: Optional[HTTP1ConnectionParameters] = None,
- context: Optional[object] = None,
+ params: HTTP1ConnectionParameters | None = None,
+ context: object | None = None,
) -> None:
"""
:arg stream: an `.IOStream`
# Save the start lines after we read or write them; they
# affect later processing (e.g. 304 responses and HEAD methods
# have content-length but no bodies)
- self._request_start_line: Optional[httputil.RequestStartLine] = None
- self._response_start_line: Optional[httputil.ResponseStartLine] = None
- self._request_headers: Optional[httputil.HTTPHeaders] = None
+ self._request_start_line: httputil.RequestStartLine | None = None
+ self._response_start_line: httputil.ResponseStartLine | None = None
+ self._request_headers: httputil.HTTPHeaders | None = None
# True if we are writing output with chunked encoding.
self._chunking_output = False
# While reading a body with a content-length, this is the
# amount left to read.
- self._expected_content_remaining: Optional[int] = None
+ self._expected_content_remaining: int | None = None
# A Future for our outgoing writes, returned by IOStream.write.
- self._pending_write: Optional[Future[None]] = None
+ self._pending_write: Future[None] | None = None
def read_response(self, delegate: httputil.HTTPMessageDelegate) -> Awaitable[bool]:
"""Read a single HTTP response.
if self.is_client:
resp_start_line = httputil.parse_response_start_line(start_line_str)
self._response_start_line = resp_start_line
- start_line: Union[
- httputil.RequestStartLine, httputil.ResponseStartLine
- ] = resp_start_line
+ start_line: httputil.RequestStartLine | httputil.ResponseStartLine = (
+ resp_start_line
+ )
# TODO: this will need to change to support client-side keepalive
self._disconnect_on_finish = False
else:
quickly in CPython by breaking up reference cycles.
"""
self._write_callback = None
- self._write_future: Optional[Future[None]] = None
- self._close_callback: Optional[Callable[[], None]] = None
+ self._write_future: Future[None] | None = None
+ self._close_callback: Callable[[], None] | None = None
if self.stream is not None:
self.stream.set_close_callback(None)
- def set_close_callback(self, callback: Optional[Callable[[], None]]) -> None:
+ def set_close_callback(self, callback: Callable[[], None] | None) -> None:
"""Sets a callback that will be run when the connection is closed.
Note that this callback is slightly different from
def write_headers(
self,
- start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
+ start_line: httputil.RequestStartLine | httputil.ResponseStartLine,
headers: httputil.HTTPHeaders,
- chunk: Optional[bytes] = None,
+ chunk: bytes | None = None,
) -> "Future[None]":
"""Implements `.HTTPConnection.write_headers`."""
lines = []
code: int,
headers: httputil.HTTPHeaders,
delegate: httputil.HTTPMessageDelegate,
- ) -> Optional[Awaitable[None]]:
+ ) -> Awaitable[None] | None:
if "Content-Length" in headers:
if "," in headers["Content-Length"]:
# Proxies sometimes cause Content-Length headers to get
headers["Content-Length"] = pieces[0]
try:
- content_length: Optional[int] = parse_int(headers["Content-Length"])
+ content_length: int | None = parse_int(headers["Content-Length"])
except ValueError:
# Handles non-integer Content-Length value.
raise httputil.HTTPInputError(
def __init__(self, delegate: httputil.HTTPMessageDelegate, chunk_size: int) -> None:
self._delegate = delegate
self._chunk_size = chunk_size
- self._decompressor: Optional[GzipDecompressor] = None
+ self._decompressor: GzipDecompressor | None = None
def headers_received(
self,
- start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
+ start_line: httputil.RequestStartLine | httputil.ResponseStartLine,
headers: httputil.HTTPHeaders,
- ) -> Optional[Awaitable[None]]:
+ ) -> Awaitable[None] | None:
if headers.get("Content-Encoding", "").lower() == "gzip":
self._decompressor = GzipDecompressor()
# Downstream delegates will only see uncompressed data,
def __init__(
self,
stream: iostream.IOStream,
- params: Optional[HTTP1ConnectionParameters] = None,
- context: Optional[object] = None,
+ params: HTTP1ConnectionParameters | None = None,
+ context: object | None = None,
) -> None:
"""
:arg stream: an `.IOStream`
params = HTTP1ConnectionParameters()
self.params = params
self.context = context
- self._serving_future: Optional[Future[None]] = None
+ self._serving_future: Future[None] | None = None
async def close(self) -> None:
"""Closes the connection.
instance_cache[instance.io_loop] = instance
return instance
- def initialize(self, defaults: Optional[dict[str, Any]] = None) -> None:
+ def initialize(self, defaults: dict[str, Any] | None = None) -> None:
self.io_loop = IOLoop.current()
self.defaults = dict(HTTPRequest._DEFAULTS)
if defaults is not None:
class HTTPRequest:
"""HTTP client request object."""
- _headers: Union[dict[str, str], httputil.HTTPHeaders]
+ _headers: dict[str, str] | httputil.HTTPHeaders
# Default values for HTTPRequest parameters.
# Merged with the values on the request object by AsyncHTTPClient
self,
url: str,
method: str = "GET",
- headers: Optional[Union[dict[str, str], httputil.HTTPHeaders]] = None,
- body: Optional[Union[bytes, str]] = None,
- auth_username: Optional[str] = None,
- auth_password: Optional[str] = None,
- auth_mode: Optional[str] = None,
- connect_timeout: Optional[float] = None,
- request_timeout: Optional[float] = None,
- if_modified_since: Optional[Union[float, datetime.datetime]] = None,
- follow_redirects: Optional[bool] = None,
- max_redirects: Optional[int] = None,
- user_agent: Optional[str] = None,
- use_gzip: Optional[bool] = None,
- network_interface: Optional[str] = None,
- streaming_callback: Optional[
- Callable[[bytes], Optional[Awaitable[None]]]
- ] = None,
- header_callback: Optional[Callable[[str], None]] = None,
- prepare_curl_callback: Optional[Callable[[Any], None]] = None,
- proxy_host: Optional[str] = None,
- proxy_port: Optional[int] = None,
- proxy_username: Optional[str] = None,
- proxy_password: Optional[str] = None,
- proxy_auth_mode: Optional[str] = None,
- allow_nonstandard_methods: Optional[bool] = None,
- validate_cert: Optional[bool] = None,
- ca_certs: Optional[str] = None,
- allow_ipv6: Optional[bool] = None,
- client_key: Optional[str] = None,
- client_cert: Optional[str] = None,
- body_producer: Optional[
+ headers: dict[str, str] | httputil.HTTPHeaders | None = None,
+ body: bytes | str | None = None,
+ auth_username: str | None = None,
+ auth_password: str | None = None,
+ auth_mode: str | None = None,
+ connect_timeout: float | None = None,
+ request_timeout: float | None = None,
+ if_modified_since: float | datetime.datetime | None = None,
+ follow_redirects: bool | None = None,
+ max_redirects: int | None = None,
+ user_agent: str | None = None,
+ use_gzip: bool | None = None,
+ network_interface: str | None = None,
+ streaming_callback: None | (Callable[[bytes], Awaitable[None] | None]) = None,
+ header_callback: Callable[[str], None] | None = None,
+ prepare_curl_callback: Callable[[Any], None] | None = None,
+ proxy_host: str | None = None,
+ proxy_port: int | None = None,
+ proxy_username: str | None = None,
+ proxy_password: str | None = None,
+ proxy_auth_mode: str | None = None,
+ allow_nonstandard_methods: bool | None = None,
+ validate_cert: bool | None = None,
+ ca_certs: str | None = None,
+ allow_ipv6: bool | None = None,
+ client_key: str | None = None,
+ client_cert: str | None = None,
+ body_producer: None | (
Callable[[Callable[[bytes], None]], "Future[None]"]
- ] = None,
+ ) = None,
expect_100_continue: bool = False,
- decompress_response: Optional[bool] = None,
- ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None,
+ decompress_response: bool | None = None,
+ ssl_options: dict[str, Any] | ssl.SSLContext | None = None,
) -> None:
r"""All parameters except ``url`` are optional.
self.max_redirects = max_redirects
self.user_agent = user_agent
if decompress_response is not None:
- self.decompress_response: Optional[bool] = decompress_response
+ self.decompress_response: bool | None = decompress_response
else:
self.decompress_response = use_gzip
self.network_interface = network_interface
return self._headers # type: ignore
@headers.setter
- def headers(self, value: Union[dict[str, str], httputil.HTTPHeaders]) -> None:
+ def headers(self, value: dict[str, str] | httputil.HTTPHeaders) -> None:
if value is None:
self._headers = httputil.HTTPHeaders()
else:
return self._body
@body.setter
- def body(self, value: Union[bytes, str]) -> None:
+ def body(self, value: bytes | str) -> None:
self._body = utf8(value)
"""
# I'm not sure why these don't get type-inferred from the references in __init__.
- error: Optional[BaseException] = None
+ error: BaseException | None = None
_error_is_response_code = False
request: HTTPRequest
self,
request: HTTPRequest,
code: int,
- headers: Optional[httputil.HTTPHeaders] = None,
- buffer: Optional[BytesIO] = None,
- effective_url: Optional[str] = None,
- error: Optional[BaseException] = None,
- request_time: Optional[float] = None,
- time_info: Optional[dict[str, float]] = None,
- reason: Optional[str] = None,
- start_time: Optional[float] = None,
+ headers: httputil.HTTPHeaders | None = None,
+ buffer: BytesIO | None = None,
+ effective_url: str | None = None,
+ error: BaseException | None = None,
+ request_time: float | None = None,
+ time_info: dict[str, float] | None = None,
+ reason: str | None = None,
+ start_time: float | None = None,
) -> None:
if isinstance(request, _RequestProxy):
self.request = request.request
else:
self.headers = httputil.HTTPHeaders()
self.buffer = buffer
- self._body: Optional[bytes] = None
+ self._body: bytes | None = None
if effective_url is None:
self.effective_url = request.url
else:
def __init__(
self,
code: int,
- message: Optional[str] = None,
- response: Optional[HTTPResponse] = None,
+ message: str | None = None,
+ response: HTTPResponse | None = None,
) -> None:
self.code = code
self.message = message or httputil.responses.get(code, "Unknown")
Used internally by AsyncHTTPClient implementations.
"""
- def __init__(
- self, request: HTTPRequest, defaults: Optional[dict[str, Any]]
- ) -> None:
+ def __init__(self, request: HTTPRequest, defaults: dict[str, Any] | None) -> None:
self.request = request
self.defaults = defaults
from tornado.util import Configurable
import typing
-from typing import Union, Any, Callable, Optional
+from typing import Any, Callable
from collections.abc import Awaitable
if typing.TYPE_CHECKING:
def initialize(
self,
- request_callback: Union[
- httputil.HTTPServerConnectionDelegate,
- Callable[[httputil.HTTPServerRequest], None],
- ],
+ request_callback: (
+ httputil.HTTPServerConnectionDelegate
+ | Callable[[httputil.HTTPServerRequest], None]
+ ),
no_keep_alive: bool = False,
xheaders: bool = False,
- ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None,
- protocol: Optional[str] = None,
+ ssl_options: dict[str, Any] | ssl.SSLContext | None = None,
+ protocol: str | None = None,
decompress_request: bool = False,
- chunk_size: Optional[int] = None,
- max_header_size: Optional[int] = None,
- idle_connection_timeout: Optional[float] = None,
- body_timeout: Optional[float] = None,
- max_body_size: Optional[int] = None,
- max_buffer_size: Optional[int] = None,
- trusted_downstream: Optional[list[str]] = None,
+ chunk_size: int | None = None,
+ max_header_size: int | None = None,
+ idle_connection_timeout: float | None = None,
+ body_timeout: float | None = None,
+ max_body_size: int | None = None,
+ max_buffer_size: int | None = None,
+ trusted_downstream: list[str] | None = None,
) -> None:
# This method's signature is not extracted with autodoc
# because we want its arguments to appear on the class
) -> None:
self.connection = request_conn
self.request_callback = request_callback
- self.request: Optional[httputil.HTTPServerRequest] = None
+ self.request: httputil.HTTPServerRequest | None = None
self.delegate = None
self._chunks: list[bytes] = []
def headers_received(
self,
- start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
+ start_line: httputil.RequestStartLine | httputil.ResponseStartLine,
headers: httputil.HTTPHeaders,
- ) -> Optional[Awaitable[None]]:
+ ) -> Awaitable[None] | None:
self.request = httputil.HTTPServerRequest(
connection=self.connection,
start_line=typing.cast(httputil.RequestStartLine, start_line),
)
return None
- def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
+ def data_received(self, chunk: bytes) -> Awaitable[None] | None:
self._chunks.append(chunk)
return None
self,
stream: iostream.IOStream,
address: tuple,
- protocol: Optional[str],
- trusted_downstream: Optional[list[str]] = None,
+ protocol: str | None,
+ trusted_downstream: list[str] | None = None,
) -> None:
self.address = address
# Save the socket's address family now so we know how to
def headers_received(
self,
- start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
+ start_line: httputil.RequestStartLine | httputil.ResponseStartLine,
headers: httputil.HTTPHeaders,
- ) -> Optional[Awaitable[None]]:
+ ) -> Awaitable[None] | None:
# TODO: either make context an official part of the
# HTTPConnection interface or figure out some other way to do this.
self.connection.context._apply_xheaders(headers) # type: ignore
return self.delegate.headers_received(start_line, headers)
- def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
+ def data_received(self, chunk: bytes) -> Awaitable[None] | None:
return self.delegate.data_received(chunk)
def finish(self) -> None:
import typing
from typing import (
- Union,
Optional,
Callable,
Any,
Added the ``real_error`` attribute.
"""
- def __init__(self, real_error: Optional[BaseException] = None) -> None:
+ def __init__(self, real_error: BaseException | None = None) -> None:
super().__init__("Stream is closed")
self.real_error = real_error
def __init__(self) -> None:
# A sequence of (False, bytearray) and (True, memoryview) objects
- self._buffers: Deque[tuple[bool, Union[bytearray, memoryview]]] = (
- collections.deque()
- )
+ self._buffers: Deque[tuple[bool, bytearray | memoryview]] = collections.deque()
# Position in the first buffer
self._first_pos = 0
self._size = 0
# of extending an existing bytearray
_large_buf_threshold = 2048
- def append(self, data: Union[bytes, bytearray, memoryview]) -> None:
+ def append(self, data: bytes | bytearray | memoryview) -> None:
"""
Append the given piece of data (should be a buffer-compatible object).
"""
def __init__(
self,
- max_buffer_size: Optional[int] = None,
- read_chunk_size: Optional[int] = None,
- max_write_buffer_size: Optional[int] = None,
+ max_buffer_size: int | None = None,
+ read_chunk_size: int | None = None,
+ max_write_buffer_size: int | None = None,
) -> None:
"""`BaseIOStream` constructor.
# spurious failures.
self.read_chunk_size = min(read_chunk_size or 65536, self.max_buffer_size // 2)
self.max_write_buffer_size = max_write_buffer_size
- self.error: Optional[BaseException] = None
+ self.error: BaseException | None = None
self._read_buffer = bytearray()
self._read_buffer_size = 0
self._user_read_buffer = False
- self._after_user_read_buffer: Optional[bytearray] = None
+ self._after_user_read_buffer: bytearray | None = None
self._write_buffer = _StreamBuffer()
self._total_write_index = 0
self._total_write_done_index = 0
- self._read_delimiter: Optional[bytes] = None
- self._read_regex: Optional[Pattern] = None
- self._read_max_bytes: Optional[int] = None
- self._read_bytes: Optional[int] = None
+ self._read_delimiter: bytes | None = None
+ self._read_regex: Pattern | None = None
+ self._read_max_bytes: int | None = None
+ self._read_bytes: int | None = None
self._read_partial = False
self._read_until_close = False
- self._read_future: Optional[Future] = None
+ self._read_future: Future | None = None
self._write_futures: Deque[tuple[int, Future[None]]] = collections.deque()
- self._close_callback: Optional[Callable[[], None]] = None
- self._connect_future: Optional[Future[IOStream]] = None
+ self._close_callback: Callable[[], None] | None = None
+ self._connect_future: Future[IOStream] | None = None
# _ssl_connect_future should be defined in SSLIOStream
# but it's here so we can clean it up in _signal_closed
# TODO: refactor that so subclasses can add additional futures
# to be cancelled.
- self._ssl_connect_future: Optional[Future[SSLIOStream]] = None
+ self._ssl_connect_future: Future[SSLIOStream] | None = None
self._connecting = False
- self._state: Optional[int] = None
+ self._state: int | None = None
self._closed = False
- def fileno(self) -> Union[int, ioloop._Selectable]:
+ def fileno(self) -> int | ioloop._Selectable:
"""Returns the file descriptor for this stream."""
raise NotImplementedError()
"""
raise NotImplementedError()
- def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
+ def read_from_fd(self, buf: bytearray | memoryview) -> int | None:
"""Attempts to read from the underlying file.
Reads up to ``len(buf)`` bytes, storing them in the buffer.
"""
raise NotImplementedError()
- def get_fd_error(self) -> Optional[Exception]:
+ def get_fd_error(self) -> Exception | None:
"""Returns information about any error on the underlying file.
This method is called after the `.IOLoop` has signaled an error on the
return None
def read_until_regex(
- self, regex: bytes, max_bytes: Optional[int] = None
+ self, regex: bytes, max_bytes: int | None = None
) -> Awaitable[bytes]:
"""Asynchronously read until we have matched the given regex.
return future
def read_until(
- self, delimiter: bytes, max_bytes: Optional[int] = None
+ self, delimiter: bytes, max_bytes: int | None = None
) -> Awaitable[bytes]:
"""Asynchronously read until we have found the given delimiter.
raise
return future
- def write(self, data: Union[bytes, memoryview]) -> "Future[None]":
+ def write(self, data: bytes | memoryview) -> "Future[None]":
"""Asynchronously write the given data to this stream.
This method returns a `.Future` that resolves (with a result
self._maybe_add_error_listener()
return future
- def set_close_callback(self, callback: Optional[Callable[[], None]]) -> None:
+ def set_close_callback(self, callback: Callable[[], None] | None) -> None:
"""Call the given callback when the stream is closed.
This mostly is not necessary for applications that use the
def close(
self,
- exc_info: Union[
- None,
- bool,
- BaseException,
- tuple[
+ exc_info: (
+ None
+ | bool
+ | BaseException
+ | tuple[
"Optional[Type[BaseException]]",
- Optional[BaseException],
- Optional[TracebackType],
- ],
- ] = False,
+ BaseException | None,
+ TracebackType | None,
+ ]
+ ) = False,
) -> None:
"""Close this stream.
def _handle_connect(self) -> None:
raise NotImplementedError()
- def _handle_events(self, fd: Union[int, ioloop._Selectable], events: int) -> None:
+ def _handle_events(self, fd: int | ioloop._Selectable, events: int) -> None:
if self.closed():
gen_log.warning("Got events for closed stream %s", fd)
return
self.close(exc_info=e)
raise
- def _read_to_buffer_loop(self) -> Optional[int]:
+ def _read_to_buffer_loop(self) -> int | None:
# This method is called from _handle_read and _try_inline_read.
if self._read_bytes is not None:
- target_bytes: Optional[int] = self._read_bytes
+ target_bytes: int | None = self._read_bytes
elif self._read_max_bytes is not None:
target_bytes = self._read_max_bytes
elif self.reading():
self._after_user_read_buffer = None
self._read_buffer_size = len(self._read_buffer)
self._user_read_buffer = False
- result: Union[int, bytes] = size
+ result: int | bytes = size
else:
result = self._consume(size)
if self._read_future is not None:
if not self.closed():
self._add_io_state(ioloop.IOLoop.READ)
- def _read_to_buffer(self) -> Optional[int]:
+ def _read_to_buffer(self) -> int | None:
"""Reads from the socket and appends the result to the read buffer.
Returns the number of bytes read. Returns 0 if there is nothing
while True:
try:
if self._user_read_buffer:
- buf: Union[memoryview, bytearray] = memoryview(
- self._read_buffer
- )[self._read_buffer_size :]
+ buf: memoryview | bytearray = memoryview(self._read_buffer)[
+ self._read_buffer_size :
+ ]
else:
buf = bytearray(self.read_chunk_size)
bytes_read = self.read_from_fd(buf)
self._read_partial = False
self._finish_read(pos)
- def _find_read_pos(self) -> Optional[int]:
+ def _find_read_pos(self) -> int | None:
"""Attempts to find a position in the read buffer that satisfies
the currently-pending read.
self._check_max_bytes(self._read_regex, self._read_buffer_size)
return None
- def _check_max_bytes(self, delimiter: Union[bytes, Pattern], size: int) -> None:
+ def _check_max_bytes(self, delimiter: bytes | Pattern, size: int) -> None:
if self._read_max_bytes is not None and size > self._read_max_bytes:
raise UnsatisfiableReadError(
"delimiter %r not found within %d bytes"
self.socket.setblocking(False)
super().__init__(*args, **kwargs)
- def fileno(self) -> Union[int, ioloop._Selectable]:
+ def fileno(self) -> int | ioloop._Selectable:
return self.socket
def close_fd(self) -> None:
self.socket.close()
self.socket = None # type: ignore
- def get_fd_error(self) -> Optional[Exception]:
+ def get_fd_error(self) -> Exception | None:
errno = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
return socket.error(errno, os.strerror(errno))
- def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
+ def read_from_fd(self, buf: bytearray | memoryview) -> int | None:
try:
return self.socket.recv_into(buf, len(buf))
except BlockingIOError:
del data
def connect(
- self: _IOStreamType, address: Any, server_hostname: Optional[str] = None
+ self: _IOStreamType, address: Any, server_hostname: str | None = None
) -> "Future[_IOStreamType]":
"""Connects the socket to a remote address without blocking.
def start_tls(
self,
server_side: bool,
- ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None,
- server_hostname: Optional[str] = None,
+ ssl_options: dict[str, Any] | ssl.SSLContext | None = None,
+ server_hostname: str | None = None,
) -> Awaitable["SSLIOStream"]:
"""Convert this `IOStream` to an `SSLIOStream`.
self._ssl_accepting = True
self._handshake_reading = False
self._handshake_writing = False
- self._server_hostname: Optional[str] = None
+ self._server_hostname: str | None = None
# If the socket is already connected, attempt to start the handshake.
try:
super()._handle_write()
def connect(
- self, address: tuple, server_hostname: Optional[str] = None
+ self, address: tuple, server_hostname: str | None = None
) -> "Future[SSLIOStream]":
self._server_hostname = server_hostname
# Ignore the result of connect(). If it fails,
# See https://github.com/tornadoweb/tornado/pull/2008
del data
- def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
+ def read_from_fd(self, buf: bytearray | memoryview) -> int | None:
try:
if self._ssl_accepting:
# If the handshake hasn't finished yet, there can't be anything
# See https://github.com/tornadoweb/tornado/pull/2008
del data
- def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
+ def read_from_fd(self, buf: bytearray | memoryview) -> int | None:
try:
return self._fio.readinto(buf) # type: ignore
except OSError as e:
from tornado import gen, ioloop
from tornado.concurrent import Future, future_set_result_unless_cancelled
-from typing import Union, Optional, Type, Any
+from typing import Optional, Type, Any
from collections.abc import Awaitable
import typing
return result + ">"
def wait(
- self, timeout: Optional[Union[float, datetime.timedelta]] = None
+ self, timeout: float | datetime.timedelta | None = None
) -> Awaitable[bool]:
"""Wait for `.notify`.
self._value = False
def wait(
- self, timeout: Optional[Union[float, datetime.timedelta]] = None
+ self, timeout: float | datetime.timedelta | None = None
) -> Awaitable[None]:
"""Block until the internal flag is true.
def __exit__(
self,
exc_type: "Optional[Type[BaseException]]",
- exc_val: Optional[BaseException],
- exc_tb: Optional[types.TracebackType],
+ exc_val: BaseException | None,
+ exc_tb: types.TracebackType | None,
) -> None:
self._obj.release()
break
def acquire(
- self, timeout: Optional[Union[float, datetime.timedelta]] = None
+ self, timeout: float | datetime.timedelta | None = None
) -> Awaitable[_ReleasingContextManager]:
"""Decrement the counter. Returns an awaitable.
def __exit__(
self,
typ: "Optional[Type[BaseException]]",
- value: Optional[BaseException],
- traceback: Optional[types.TracebackType],
+ value: BaseException | None,
+ traceback: types.TracebackType | None,
) -> None:
self.__enter__()
async def __aexit__(
self,
typ: "Optional[Type[BaseException]]",
- value: Optional[BaseException],
- tb: Optional[types.TracebackType],
+ value: BaseException | None,
+ tb: types.TracebackType | None,
) -> None:
self.release()
return f"<{self.__class__.__name__} _block={self._block}>"
def acquire(
- self, timeout: Optional[Union[float, datetime.timedelta]] = None
+ self, timeout: float | datetime.timedelta | None = None
) -> Awaitable[_ReleasingContextManager]:
"""Attempt to lock. Returns an awaitable.
def __exit__(
self,
typ: "Optional[Type[BaseException]]",
- value: Optional[BaseException],
- tb: Optional[types.TracebackType],
+ value: BaseException | None,
+ tb: types.TracebackType | None,
) -> None:
self.__enter__()
async def __aexit__(
self,
typ: "Optional[Type[BaseException]]",
- value: Optional[BaseException],
- tb: Optional[types.TracebackType],
+ value: BaseException | None,
+ tb: types.TracebackType | None,
) -> None:
self.release()
except ImportError:
curses = None # type: ignore
-from typing import Any, cast, Optional
+from typing import Any, cast
# Logger objects for internal tornado use
access_log = logging.getLogger("tornado.access")
def enable_pretty_logging(
- options: Any = None, logger: Optional[logging.Logger] = None
+ options: Any = None, logger: logging.Logger | None = None
) -> None:
"""Turns on formatted logging output as configured.
from tornado.ioloop import IOLoop
from tornado.util import Configurable, errno_from_exception
-from typing import Callable, Any, Union, Optional
+from typing import Callable, Any
from collections.abc import Awaitable
# Note that the naming of ssl.Purpose is confusing; the purpose
def bind_sockets(
port: int,
- address: Optional[str] = None,
+ address: str | None = None,
family: socket.AddressFamily = socket.AF_UNSPEC,
backlog: int = _DEFAULT_BACKLOG,
- flags: Optional[int] = None,
+ flags: int | None = None,
reuse_port: bool = False,
) -> list[socket.socket]:
"""Creates listening sockets bound to the given port and address.
def initialize(
self,
- executor: Optional[concurrent.futures.Executor] = None,
+ executor: concurrent.futures.Executor | None = None,
close_executor: bool = True,
) -> None:
if executor is not None:
of this class.
"""
- _threadpool: Optional[concurrent.futures.ThreadPoolExecutor] = None
- _threadpool_pid: Optional[int] = None
+ _threadpool: concurrent.futures.ThreadPoolExecutor | None = None
+ _threadpool_pid: int | None = None
def initialize(self, num_threads: int = 10) -> None: # type: ignore
threadpool = ThreadedResolver._create_threadpool(num_threads)
def ssl_options_to_context(
- ssl_options: Union[dict[str, Any], ssl.SSLContext],
- server_side: Optional[bool] = None,
+ ssl_options: dict[str, Any] | ssl.SSLContext,
+ server_side: bool | None = None,
) -> ssl.SSLContext:
"""Try to convert an ``ssl_options`` dictionary to an
`~ssl.SSLContext` object.
def ssl_wrap_socket(
socket: socket.socket,
- ssl_options: Union[dict[str, Any], ssl.SSLContext],
- server_hostname: Optional[str] = None,
- server_side: Optional[bool] = None,
+ ssl_options: dict[str, Any] | ssl.SSLContext,
+ server_hostname: str | None = None,
+ server_side: bool | None = None,
**kwargs: Any,
) -> ssl.SSLSocket:
"""Returns an ``ssl.SSLSocket`` wrapping the given socket.
Any,
Callable,
TextIO,
- Optional,
)
from collections.abc import Iterator, Iterable
self,
name: str,
default: Any = None,
- type: Optional[type] = None,
- help: Optional[str] = None,
- metavar: Optional[str] = None,
+ type: type | None = None,
+ help: str | None = None,
+ metavar: str | None = None,
multiple: bool = False,
- group: Optional[str] = None,
- callback: Optional[Callable[[Any], None]] = None,
+ group: str | None = None,
+ callback: Callable[[Any], None] | None = None,
) -> None:
"""Defines a new command line option.
else:
type = str
if group:
- group_name: Optional[str] = group
+ group_name: str | None = group
else:
group_name = file_name
option = _Option(
self._options[normalized] = option
def parse_command_line(
- self, args: Optional[list[str]] = None, final: bool = True
+ self, args: list[str] | None = None, final: bool = True
) -> list[str]:
"""Parses all options given on the command line (defaults to
`sys.argv`).
if final:
self.run_parse_callbacks()
- def print_help(self, file: Optional[TextIO] = None) -> None:
+ def print_help(self, file: TextIO | None = None) -> None:
"""Prints all the command line options to stderr (or another file)."""
if file is None:
file = sys.stderr
self,
name: str,
default: Any = None,
- type: Optional[type] = None,
- help: Optional[str] = None,
- metavar: Optional[str] = None,
+ type: type | None = None,
+ help: str | None = None,
+ metavar: str | None = None,
multiple: bool = False,
- file_name: Optional[str] = None,
- group_name: Optional[str] = None,
- callback: Optional[Callable[[Any], None]] = None,
+ file_name: str | None = None,
+ group_name: str | None = None,
+ callback: Callable[[Any], None] | None = None,
) -> None:
if default is None and multiple:
default = []
def define(
name: str,
default: Any = None,
- type: Optional[type] = None,
- help: Optional[str] = None,
- metavar: Optional[str] = None,
+ type: type | None = None,
+ help: str | None = None,
+ metavar: str | None = None,
multiple: bool = False,
- group: Optional[str] = None,
- callback: Optional[Callable[[Any], None]] = None,
+ group: str | None = None,
+ callback: Callable[[Any], None] | None = None,
) -> None:
"""Defines an option in the global namespace.
)
-def parse_command_line(
- args: Optional[list[str]] = None, final: bool = True
-) -> list[str]:
+def parse_command_line(args: list[str] | None = None, final: bool = True) -> list[str]:
"""Parses global options from the command line.
See `OptionParser.parse_command_line`.
return options.parse_config_file(path, final=final)
-def print_help(file: Optional[TextIO] = None) -> None:
+def print_help(file: TextIO | None = None) -> None:
"""Prints all the command line options to stderr (or another file).
See `OptionParser.print_help`.
from typing import (
Any,
Callable,
- Optional,
Protocol,
TypeVar,
Union,
# doesn't understand dynamic proxies.
self.selector_loop = AddThreadSelectorEventLoop(asyncio_loop) # type: ignore
# Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
- self.handlers: dict[int, tuple[Union[int, _Selectable], Callable]] = {}
+ self.handlers: dict[int, tuple[int | _Selectable, Callable]] = {}
# Set of fds listening for reads/writes
self.readers: set[int] = set()
self.writers: set[int] = set()
self.asyncio_loop.close()
def add_handler(
- self, fd: Union[int, _Selectable], handler: Callable[..., None], events: int
+ self, fd: int | _Selectable, handler: Callable[..., None], events: int
) -> None:
fd, fileobj = self.split_fd(fd)
if fd in self.handlers:
self.selector_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
self.writers.add(fd)
- def update_handler(self, fd: Union[int, _Selectable], events: int) -> None:
+ def update_handler(self, fd: int | _Selectable, events: int) -> None:
fd, fileobj = self.split_fd(fd)
if events & IOLoop.READ:
if fd not in self.readers:
self.selector_loop.remove_writer(fd)
self.writers.remove(fd)
- def remove_handler(self, fd: Union[int, _Selectable]) -> None:
+ def remove_handler(self, fd: int | _Selectable) -> None:
fd, fileobj = self.split_fd(fd)
if fd not in self.handlers:
return
def run_in_executor(
self,
- executor: Optional[concurrent.futures.Executor],
+ executor: concurrent.futures.Executor | None,
func: Callable[..., _T],
*args: Any,
) -> "asyncio.Future[_T]":
self._real_loop = real_loop
self._select_cond = threading.Condition()
- self._select_args: Optional[
+ self._select_args: None | (
tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]]
- ] = None
+ ) = None
self._closing_selector = False
- self._thread: Optional[threading.Thread] = None
+ self._thread: threading.Thread | None = None
self._thread_manager_handle = self._thread_manager()
async def thread_manager_anext() -> None:
from tornado.log import gen_log
import typing
-from typing import Optional, Any, Callable
+from typing import Any, Callable
if typing.TYPE_CHECKING:
from typing import List # noqa: F401
_task_id = None
-def fork_processes(
- num_processes: Optional[int], max_restarts: Optional[int] = None
-) -> int:
+def fork_processes(num_processes: int | None, max_restarts: int | None = None) -> int:
"""Starts multiple worker processes.
If ``num_processes`` is None or <= 0, we detect the number of cores
gen_log.info("Starting %d processes", num_processes)
children = {}
- def start_child(i: int) -> Optional[int]:
+ def start_child(i: int) -> int | None:
pid = os.fork()
if pid == 0:
# child process
sys.exit(0)
-def task_id() -> Optional[int]:
+def task_id() -> int | None:
"""Returns the current task id, if any.
Returns None if this process was not created by `fork_processes`.
for attr in ["stdin", "stdout", "stderr"]:
if not hasattr(self, attr): # don't clobber streams set above
setattr(self, attr, getattr(self.proc, attr))
- self._exit_callback: Optional[Callable[[int], None]] = None
- self.returncode: Optional[int] = None
+ self._exit_callback: Callable[[int], None] | None = None
+ self.returncode: int | None = None
def set_exit_callback(self, callback: Callable[[int], None]) -> None:
"""Runs ``callback`` when this process exits.
from typing import (
Any,
Union,
- Optional,
overload,
)
from collections.abc import Awaitable, Sequence
def find_handler(
self, request: httputil.HTTPServerRequest, **kwargs: Any
- ) -> Optional[httputil.HTTPMessageDelegate]:
+ ) -> httputil.HTTPMessageDelegate | None:
"""Must be implemented to return an appropriate instance of `~.httputil.HTTPMessageDelegate`
that can serve the request.
Routing implementations may pass additional kwargs to extend the routing logic.
and support reversing them to original urls.
"""
- def reverse_url(self, name: str, *args: Any) -> Optional[str]:
+ def reverse_url(self, name: str, *args: Any) -> str | None:
"""Returns url string for a given route name and arguments
or ``None`` if no match is found.
) -> None:
self.server_conn = server_conn
self.request_conn = request_conn
- self.delegate: Optional[httputil.HTTPMessageDelegate] = None
+ self.delegate: httputil.HTTPMessageDelegate | None = None
self.router: Router = router
def headers_received(
self,
- start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
+ start_line: httputil.RequestStartLine | httputil.ResponseStartLine,
headers: httputil.HTTPHeaders,
- ) -> Optional[Awaitable[None]]:
+ ) -> Awaitable[None] | None:
assert isinstance(start_line, httputil.RequestStartLine)
request = httputil.HTTPServerRequest(
connection=self.request_conn,
return self.delegate.headers_received(start_line, headers)
- def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
+ def data_received(self, chunk: bytes) -> Awaitable[None] | None:
assert self.delegate is not None
return self.delegate.data_received(chunk)
class RuleRouter(Router):
"""Rule-based router implementation."""
- def __init__(self, rules: Optional[_RuleList] = None) -> None:
+ def __init__(self, rules: _RuleList | None = None) -> None:
"""Constructs a router from an ordered list of rules::
RuleRouter([
def find_handler(
self, request: httputil.HTTPServerRequest, **kwargs: Any
- ) -> Optional[httputil.HTTPMessageDelegate]:
+ ) -> httputil.HTTPMessageDelegate | None:
for rule in self.rules:
target_params = rule.matcher.match(request)
if target_params is not None:
def get_target_delegate(
self, target: Any, request: httputil.HTTPServerRequest, **target_params: Any
- ) -> Optional[httputil.HTTPMessageDelegate]:
+ ) -> httputil.HTTPMessageDelegate | None:
"""Returns an instance of `~.httputil.HTTPMessageDelegate` for a
Rule's target. This method is called by `~.find_handler` and can be
extended to provide additional target types.
in a rule's matcher (see `Matcher.reverse`).
"""
- def __init__(self, rules: Optional[_RuleList] = None) -> None:
+ def __init__(self, rules: _RuleList | None = None) -> None:
self.named_rules: dict[str, Any] = {}
super().__init__(rules)
return rule
- def reverse_url(self, name: str, *args: Any) -> Optional[str]:
+ def reverse_url(self, name: str, *args: Any) -> str | None:
if name in self.named_rules:
return self.named_rules[name].matcher.reverse(*args)
self,
matcher: "Matcher",
target: Any,
- target_kwargs: Optional[dict[str, Any]] = None,
- name: Optional[str] = None,
+ target_kwargs: dict[str, Any] | None = None,
+ name: str | None = None,
) -> None:
"""Constructs a Rule instance.
self.target_kwargs = target_kwargs if target_kwargs else {}
self.name = name
- def reverse(self, *args: Any) -> Optional[str]:
+ def reverse(self, *args: Any) -> str | None:
return self.matcher.reverse(*args)
def __repr__(self) -> str:
class Matcher:
"""Represents a matcher for request features."""
- def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]:
+ def match(self, request: httputil.HTTPServerRequest) -> dict[str, Any] | None:
"""Matches current instance against the request.
:arg httputil.HTTPServerRequest request: current HTTP request
``None`` must be returned to indicate that there is no match."""
raise NotImplementedError()
- def reverse(self, *args: Any) -> Optional[str]:
+ def reverse(self, *args: Any) -> str | None:
"""Reconstructs full url from matcher instance and additional arguments."""
return None
class AnyMatches(Matcher):
"""Matches any request."""
- def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]:
+ def match(self, request: httputil.HTTPServerRequest) -> dict[str, Any] | None:
return {}
class HostMatches(Matcher):
"""Matches requests from hosts specified by ``host_pattern`` regex."""
- def __init__(self, host_pattern: Union[str, Pattern]) -> None:
+ def __init__(self, host_pattern: str | Pattern) -> None:
if isinstance(host_pattern, basestring_type):
if not host_pattern.endswith("$"):
host_pattern += "$"
else:
self.host_pattern = host_pattern
- def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]:
+ def match(self, request: httputil.HTTPServerRequest) -> dict[str, Any] | None:
if self.host_pattern.match(request.host_name):
return {}
self.application = application
self.host_pattern = host_pattern
- def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]:
+ def match(self, request: httputil.HTTPServerRequest) -> dict[str, Any] | None:
# Look for default host if not behind load balancer (for debugging)
if "X-Real-Ip" not in request.headers:
if self.host_pattern.match(self.application.default_host):
class PathMatches(Matcher):
"""Matches requests with paths specified by ``path_pattern`` regex."""
- def __init__(self, path_pattern: Union[str, Pattern]) -> None:
+ def __init__(self, path_pattern: str | Pattern) -> None:
if isinstance(path_pattern, basestring_type):
if not path_pattern.endswith("$"):
path_pattern += "$"
self._path, self._group_count = self._find_groups()
- def match(self, request: httputil.HTTPServerRequest) -> Optional[dict[str, Any]]:
+ def match(self, request: httputil.HTTPServerRequest) -> dict[str, Any] | None:
match = self.regex.match(request.path)
if match is None:
return None
return dict(path_args=path_args, path_kwargs=path_kwargs)
- def reverse(self, *args: Any) -> Optional[str]:
+ def reverse(self, *args: Any) -> str | None:
if self._path is None:
raise ValueError("Cannot reverse url regex " + self.regex.pattern)
assert len(args) == self._group_count, (
converted_args.append(url_escape(utf8(a), plus=False))
return self._path % tuple(converted_args)
- def _find_groups(self) -> tuple[Optional[str], Optional[int]]:
+ def _find_groups(self) -> tuple[str | None, int | None]:
"""Returns a tuple (reverse string, group count) for a url.
For example: Given the url pattern /([0-9]{4})/([a-z-]+)/, this method
def __init__(
self,
- pattern: Union[str, Pattern],
+ pattern: str | Pattern,
handler: Any,
- kwargs: Optional[dict[str, Any]] = None,
- name: Optional[str] = None,
+ kwargs: dict[str, Any] | None = None,
+ name: str | None = None,
) -> None:
"""Parameters:
pass
-def _unquote_or_none(s: Optional[str]) -> Optional[bytes]: # noqa: F811
+def _unquote_or_none(s: str | None) -> bytes | None: # noqa: F811
"""None-safe wrapper around url_unescape to handle unmatched optional
groups correctly.
from io import BytesIO
import urllib.parse
-from typing import Any, Callable, Optional, Type, Union
+from typing import Any, Callable, Optional, Type
from collections.abc import Awaitable
from types import TracebackType
import typing
def initialize( # type: ignore
self,
max_clients: int = 10,
- hostname_mapping: Optional[dict[str, str]] = None,
+ hostname_mapping: dict[str, str] | None = None,
max_buffer_size: int = 104857600,
- resolver: Optional[Resolver] = None,
- defaults: Optional[dict[str, Any]] = None,
- max_header_size: Optional[int] = None,
- max_body_size: Optional[int] = None,
+ resolver: Resolver | None = None,
+ defaults: dict[str, Any] | None = None,
+ max_header_size: int | None = None,
+ max_body_size: int | None = None,
) -> None:
super().initialize(defaults=defaults)
self.max_clients = max_clients
self.io_loop.remove_timeout(timeout_handle)
del self.waiting[key]
- def _on_timeout(self, key: object, info: Optional[str] = None) -> None:
+ def _on_timeout(self, key: object, info: str | None = None) -> None:
"""Timeout callback of request.
Construct a timeout HTTPResponse when a timeout occurs.
def __init__(
self,
- client: Optional[SimpleAsyncHTTPClient],
+ client: SimpleAsyncHTTPClient | None,
request: HTTPRequest,
release_callback: Callable[[], None],
final_callback: Callable[[HTTPResponse], None],
self.tcp_client = tcp_client
self.max_header_size = max_header_size
self.max_body_size = max_body_size
- self.code: Optional[int] = None
- self.headers: Optional[httputil.HTTPHeaders] = None
+ self.code: int | None = None
+ self.headers: httputil.HTTPHeaders | None = None
self.chunks: list[bytes] = []
self._decompressor = None
# Timeout handle returned by IOLoop.add_timeout
if not self._handle_exception(*sys.exc_info()):
raise
- def _get_ssl_options(
- self, scheme: str
- ) -> Union[None, dict[str, Any], ssl.SSLContext]:
+ def _get_ssl_options(self, scheme: str) -> None | dict[str, Any] | ssl.SSLContext:
if scheme == "https":
if self.request.ssl_options is not None:
return self.request.ssl_options
return ssl_ctx
return None
- def _on_timeout(self, info: Optional[str] = None) -> None:
+ def _on_timeout(self, info: str | None = None) -> None:
"""Timeout callback of _HTTPConnection instance.
Raise a `HTTPTimeoutError` when a timeout occurs.
def _handle_exception(
self,
typ: "Optional[Type[BaseException]]",
- value: Optional[BaseException],
- tb: Optional[TracebackType],
+ value: BaseException | None,
+ tb: TracebackType | None,
) -> bool:
if self.final_callback is not None:
self._remove_timeout()
async def headers_received(
self,
- first_line: Union[httputil.ResponseStartLine, httputil.RequestStartLine],
+ first_line: httputil.ResponseStartLine | httputil.RequestStartLine,
headers: httputil.HTTPHeaders,
) -> None:
assert isinstance(first_line, httputil.ResponseStartLine)
def _on_end_request(self) -> None:
self.stream.close()
- def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
+ def data_received(self, chunk: bytes) -> Awaitable[None] | None:
if self._should_follow_redirect():
# We're going to follow a redirect so just discard the body.
return None
from tornado.netutil import Resolver
from tornado.gen import TimeoutError
-from typing import Any, Union, Tuple, Callable, Optional
+from typing import Any, Tuple, Callable
from collections.abc import Iterator
if typing.TYPE_CHECKING:
self.connect = connect
self.future: Future[tuple[socket.AddressFamily, Any, IOStream]] = Future()
- self.timeout: Optional[object] = None
- self.connect_timeout: Optional[object] = None
- self.last_error: Optional[Exception] = None
+ self.timeout: object | None = None
+ self.connect_timeout: object | None = None
+ self.last_error: Exception | None = None
self.remaining = len(addrinfo)
self.primary_addrs, self.secondary_addrs = self.split(addrinfo)
self.streams: set[IOStream] = set()
def start(
self,
timeout: float = _INITIAL_CONNECT_TIMEOUT,
- connect_timeout: Optional[Union[float, datetime.timedelta]] = None,
+ connect_timeout: float | datetime.timedelta | None = None,
) -> "Future[Tuple[socket.AddressFamily, Any, IOStream]]":
self.try_connect(iter(self.primary_addrs))
self.set_timeout(timeout)
if self.timeout is not None:
self.io_loop.remove_timeout(self.timeout)
- def set_connect_timeout(
- self, connect_timeout: Union[float, datetime.timedelta]
- ) -> None:
+ def set_connect_timeout(self, connect_timeout: float | datetime.timedelta) -> None:
self.connect_timeout = self.io_loop.add_timeout(
connect_timeout, self.on_connect_timeout
)
The ``io_loop`` argument (deprecated since version 4.1) has been removed.
"""
- def __init__(self, resolver: Optional[Resolver] = None) -> None:
+ def __init__(self, resolver: Resolver | None = None) -> None:
if resolver is not None:
self.resolver = resolver
self._own_resolver = False
host: str,
port: int,
af: socket.AddressFamily = socket.AF_UNSPEC,
- ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None,
- max_buffer_size: Optional[int] = None,
- source_ip: Optional[str] = None,
- source_port: Optional[int] = None,
- timeout: Optional[Union[float, datetime.timedelta]] = None,
+ ssl_options: dict[str, Any] | ssl.SSLContext | None = None,
+ max_buffer_size: int | None = None,
+ source_ip: str | None = None,
+ source_port: int | None = None,
+ timeout: float | datetime.timedelta | None = None,
) -> IOStream:
"""Connect to the given host and port.
def _create_stream(
self,
- max_buffer_size: Optional[int],
+ max_buffer_size: int | None,
af: socket.AddressFamily,
addr: tuple,
- source_ip: Optional[str] = None,
- source_port: Optional[int] = None,
+ source_ip: str | None = None,
+ source_port: int | None = None,
) -> tuple[IOStream, "Future[IOStream]"]:
# Always connect in plaintext; we'll convert to ssl if necessary
# after one connection has completed.
from tornado.util import errno_from_exception
import typing
-from typing import Union, Any, Optional
+from typing import Any
from collections.abc import Iterable, Awaitable
if typing.TYPE_CHECKING:
def __init__(
self,
- ssl_options: Optional[Union[dict[str, Any], ssl.SSLContext]] = None,
- max_buffer_size: Optional[int] = None,
- read_chunk_size: Optional[int] = None,
+ ssl_options: dict[str, Any] | ssl.SSLContext | None = None,
+ max_buffer_size: int | None = None,
+ read_chunk_size: int | None = None,
) -> None:
self.ssl_options = ssl_options
self._sockets: dict[int, socket.socket] = {}
def listen(
self,
port: int,
- address: Optional[str] = None,
+ address: str | None = None,
family: socket.AddressFamily = socket.AF_UNSPEC,
backlog: int = _DEFAULT_BACKLOG,
- flags: Optional[int] = None,
+ flags: int | None = None,
reuse_port: bool = False,
) -> None:
"""Starts accepting connections on the given port.
def bind(
self,
port: int,
- address: Optional[str] = None,
+ address: str | None = None,
family: socket.AddressFamily = socket.AF_UNSPEC,
backlog: int = _DEFAULT_BACKLOG,
- flags: Optional[int] = None,
+ flags: int | None = None,
reuse_port: bool = False,
) -> None:
"""Binds this server to the given port on the given address.
self._pending_sockets.extend(sockets)
def start(
- self, num_processes: Optional[int] = 1, max_restarts: Optional[int] = None
+ self, num_processes: int | None = 1, max_restarts: int | None = None
) -> None:
"""Starts this server in the `.IOLoop`.
self._handlers.pop(fd)()
sock.close()
- def handle_stream(
- self, stream: IOStream, address: tuple
- ) -> Optional[Awaitable[None]]:
+ def handle_stream(self, stream: IOStream, address: tuple) -> Awaitable[None] | None:
"""Override to handle a new `.IOStream` from an incoming connection.
This method may be a coroutine; if so any exceptions it raises
from tornado.log import app_log
from tornado.util import ObjectDict, exec_in, unicode_type
-from typing import Any, Union, Callable, Optional, TextIO
+from typing import Any, Callable, Optional, TextIO
from collections.abc import Iterable
import typing
# this signature update website/sphinx/template.rst too.
def __init__(
self,
- template_string: Union[str, bytes],
+ template_string: str | bytes,
name: str = "<string>",
loader: Optional["BaseLoader"] = None,
- compress_whitespace: Union[bool, _UnsetMarker] = _UNSET,
- autoescape: Optional[Union[str, _UnsetMarker]] = _UNSET,
- whitespace: Optional[str] = None,
+ compress_whitespace: bool | _UnsetMarker = _UNSET,
+ autoescape: str | _UnsetMarker | None = _UNSET,
+ whitespace: str | None = None,
) -> None:
"""Construct a Template.
filter_whitespace(whitespace, "")
if not isinstance(autoescape, _UnsetMarker):
- self.autoescape: Optional[str] = autoescape
+ self.autoescape: str | None = autoescape
elif loader:
self.autoescape = loader.autoescape
else:
def __init__(
self,
- autoescape: Optional[str] = _DEFAULT_AUTOESCAPE,
- namespace: Optional[dict[str, Any]] = None,
- whitespace: Optional[str] = None,
+ autoescape: str | None = _DEFAULT_AUTOESCAPE,
+ namespace: dict[str, Any] | None = None,
+ whitespace: str | None = None,
) -> None:
"""Construct a template loader.
with self.lock:
self.templates = {}
- def resolve_path(self, name: str, parent_path: Optional[str] = None) -> str:
+ def resolve_path(self, name: str, parent_path: str | None = None) -> str:
"""Converts a possibly-relative path to absolute (used internally)."""
raise NotImplementedError()
- def load(self, name: str, parent_path: Optional[str] = None) -> Template:
+ def load(self, name: str, parent_path: str | None = None) -> Template:
"""Loads a template."""
name = self.resolve_path(name, parent_path=parent_path)
with self.lock:
super().__init__(**kwargs)
self.root = os.path.abspath(root_directory)
- def resolve_path(self, name: str, parent_path: Optional[str] = None) -> str:
+ def resolve_path(self, name: str, parent_path: str | None = None) -> str:
if (
parent_path
and not parent_path.startswith("<")
super().__init__(**kwargs)
self.dict = dict
- def resolve_path(self, name: str, parent_path: Optional[str] = None) -> str:
+ def resolve_path(self, name: str, parent_path: str | None = None) -> str:
if (
parent_path
and not parent_path.startswith("<")
raise NotImplementedError()
def find_named_blocks(
- self, loader: Optional[BaseLoader], named_blocks: dict[str, "_NamedBlock"]
+ self, loader: BaseLoader | None, named_blocks: dict[str, "_NamedBlock"]
) -> None:
for child in self.each_child():
child.find_named_blocks(loader, named_blocks)
block.body.generate(writer)
def find_named_blocks(
- self, loader: Optional[BaseLoader], named_blocks: dict[str, "_NamedBlock"]
+ self, loader: BaseLoader | None, named_blocks: dict[str, "_NamedBlock"]
) -> None:
named_blocks[self.name] = self
_Node.find_named_blocks(self, loader, named_blocks)
self.line = line
def find_named_blocks(
- self, loader: Optional[BaseLoader], named_blocks: dict[str, _NamedBlock]
+ self, loader: BaseLoader | None, named_blocks: dict[str, _NamedBlock]
) -> None:
assert loader is not None
included = loader.load(self.name, self.template_name)
"""
def __init__(
- self, message: str, filename: Optional[str] = None, lineno: int = 0
+ self, message: str, filename: str | None = None, lineno: int = 0
) -> None:
self.message = message
# The names "filename" and "lineno" are chosen for consistency
self,
file: TextIO,
named_blocks: dict[str, _NamedBlock],
- loader: Optional[BaseLoader],
+ loader: BaseLoader | None,
current_template: Template,
) -> None:
self.file = file
return IncludeTemplate()
def write_line(
- self, line: str, line_number: int, indent: Optional[int] = None
+ self, line: str, line_number: int, indent: int | None = None
) -> None:
if indent is None:
indent = self._indent
self.line = 1
self.pos = 0
- def find(self, needle: str, start: int = 0, end: Optional[int] = None) -> int:
+ def find(self, needle: str, start: int = 0, end: int | None = None) -> int:
assert start >= 0, start
pos = self.pos
start += pos
index -= pos
return index
- def consume(self, count: Optional[int] = None) -> str:
+ def consume(self, count: int | None = None) -> str:
if count is None:
count = len(self.text) - self.pos
newpos = self.pos + count
def __len__(self) -> int:
return self.remaining()
- def __getitem__(self, key: Union[int, slice]) -> str:
+ def __getitem__(self, key: int | slice) -> str:
if isinstance(key, slice):
size = len(self)
start, stop, step = key.indices(size)
def _parse(
reader: _TemplateReader,
template: Template,
- in_block: Optional[str] = None,
- in_loop: Optional[str] = None,
+ in_block: str | None = None,
+ in_loop: str | None = None,
) -> _ChunkList:
body = _ChunkList([])
while True:
reader.raise_parse_error("set missing statement")
block = _Statement(suffix, line)
elif operator == "autoescape":
- fn: Optional[str] = suffix.strip()
+ fn: str | None = suffix.strip()
if fn == "None":
fn = None
template.autoescape = fn
import sys
import traceback
import types
-import typing
import unittest
import tornado
class C:
def __init__(self, name):
self.name = name
- self.a: typing.Optional[C] = None
- self.b: typing.Optional[C] = None
- self.c: typing.Optional[C] = None
+ self.a: C | None = None
+ self.b: C | None = None
+ self.c: C | None = None
def __repr__(self):
return f"name={self.name}"
from typing import List, Tuple, Union, Dict, Any # noqa: F401
-linkify_tests: list[tuple[Union[str, bytes], dict[str, Any], str]] = [
+linkify_tests: list[tuple[str | bytes, dict[str, Any], str]] = [
# (input, linkify_kwargs, expected_output)
(
"hello http://world.com/!",
self.assertEqual(linked, html)
def test_xhtml_escape(self):
- tests: list[tuple[Union[str, bytes], Union[str, bytes]]] = [
+ tests: list[tuple[str | bytes, str | bytes]] = [
("<foo>", "<foo>"),
("<foo>", "<foo>"),
(b"<foo>", b"<foo>"),
self.assertEqual(unescaped, xhtml_unescape(escaped))
def test_url_escape_unicode(self):
- tests: list[tuple[Union[str, bytes], str]] = [
+ tests: list[tuple[str | bytes, str]] = [
# byte strings are passed through as-is
("\u00e9".encode(), "%C3%A9"),
("\u00e9".encode("latin1"), "%E9"),
def test_gc(self):
# GitHub issue 1769: Runner objects can get GCed unexpectedly
# while their future is alive.
- weakref_scope: list[Optional[weakref.ReferenceType]] = [None]
+ weakref_scope: list[weakref.ReferenceType | None] = [None]
def callback():
gc.collect(2)
# their loop is closed, even if they're involved in a reference
# cycle.
loop = self.get_new_ioloop()
- result: list[Optional[bool]] = []
+ result: list[bool | None] = []
wfut = []
@gen.coroutine
result.append(None)
loop = self.get_new_ioloop()
- result: list[Optional[bool]] = []
+ result: list[bool | None] = []
wfut = []
@gen.coroutine
class HTTP1ConnectionTest(AsyncTestCase):
- code: typing.Optional[int] = None
+ code: int | None = None
def setUp(self):
super().setUp()
class TestIOLoopCurrent(unittest.TestCase):
def setUp(self):
setup_with_context_manager(self, ignore_deprecation())
- self.io_loop: typing.Optional[IOLoop] = None
+ self.io_loop: IOLoop | None = None
IOLoop.clear_current()
def tearDown(self):
self.server_stream = None
self.server_accepted: Future[None] = Future()
netutil.add_accept_handler(self.listener, self.accept)
- self.client_stream: typing.Optional[IOStream] = IOStream(socket.socket())
+ self.client_stream: IOStream | None = IOStream(socket.socket())
self.io_loop.add_future(
self.client_stream.connect(("127.0.0.1", self.port)), self.stop
)
class ConditionTest(AsyncTestCase):
def setUp(self):
super().setUp()
- self.history: list[typing.Union[int, str]] = []
+ self.history: list[int | str] = []
def record_done(self, future, key):
"""Record the resolution of a Future returned by Condition.wait."""
self.assertTrue("# test.html:2" in traceback.format_exc())
def test_error_line_number_module(self):
- loader: typing.Optional[DictLoader] = None
+ loader: DictLoader | None = None
def load_generate(path, **kwargs):
assert loader is not None
settings=dict(cookie_secret=cookie_secret, key_version=key_version)
)
- def get_cookie(self, name) -> typing.Optional[str]: # type: ignore[override]
+ def get_cookie(self, name) -> str | None: # type: ignore[override]
return to_unicode(self._cookies.get(name))
def set_cookie(self, name, value, expires_days=None): # type: ignore[override]
super().__init__(methodName)
self.__stopped = False
self.__running = False
- self.__failure: Optional[_ExcInfoTuple] = None
+ self.__failure: _ExcInfoTuple | None = None
self.__stop_args: Any = None
- self.__timeout: Optional[object] = None
+ self.__timeout: object | None = None
# Not used in this class itself, but used by @gen_test
- self._test_generator: Optional[Union[Generator, Coroutine]] = None
+ self._test_generator: Generator | Coroutine | None = None
def setUp(self) -> None:
py_ver = sys.version_info
raise_exc_info(failure)
def run(
- self, result: Optional[unittest.TestResult] = None
- ) -> Optional[unittest.TestResult]:
+ self, result: unittest.TestResult | None = None
+ ) -> unittest.TestResult | None:
ret = super().run(result)
# As a last resort, if an exception escaped super.run() and wasn't
# re-raised in tearDown, raise it here. This will cause the
def wait(
self,
- condition: Optional[Callable[..., bool]] = None,
- timeout: Optional[float] = None,
+ condition: Callable[..., bool] | None = None,
+ timeout: float | None = None,
) -> Any:
"""Runs the `.IOLoop` until stop is called or timeout has passed.
@typing.overload
def gen_test(
- *, timeout: Optional[float] = None
+ *, timeout: float | None = None
) -> Callable[[Callable[..., Union[Generator, "Coroutine"]]], Callable[..., None]]:
pass
def gen_test( # noqa: F811
- func: Optional[Callable[..., Union[Generator, "Coroutine"]]] = None,
- timeout: Optional[float] = None,
-) -> Union[
- Callable[..., None],
- Callable[[Callable[..., Union[Generator, "Coroutine"]]], Callable[..., None]],
-]:
+ func: Callable[..., Union[Generator, "Coroutine"]] | None = None,
+ timeout: float | None = None,
+) -> (
+ Callable[..., None]
+ | Callable[[Callable[..., Union[Generator, "Coroutine"]]], Callable[..., None]]
+):
"""Testing equivalent of ``@gen.coroutine``, to be applied to test methods.
``@gen.coroutine`` cannot be used on tests because the `.IOLoop` is not
@functools.wraps(f)
def pre_coroutine(
self: AsyncTestCase, *args: Any, **kwargs: Any
- ) -> Union[Generator, Coroutine]:
+ ) -> Generator | Coroutine:
# Type comments used to avoid pypy3 bug.
result = f(self, *args, **kwargs)
if isinstance(result, Generator) or inspect.iscoroutine(result):
def __init__(
self,
- logger: Union[logging.Logger, basestring_type],
+ logger: logging.Logger | basestring_type,
regex: str,
required: bool = True,
- level: Optional[int] = None,
+ level: int | None = None,
) -> None:
"""Constructs an ExpectLog context manager.
self.deprecated_level_matched = 0
self.logged_stack = False
self.level = level
- self.orig_level: Optional[int] = None
+ self.orig_level: int | None = None
def filter(self, record: logging.LogRecord) -> bool:
if record.exc_info:
def __exit__(
self,
typ: "Optional[Type[BaseException]]",
- value: Optional[BaseException],
- tb: Optional[TracebackType],
+ value: BaseException | None,
+ tb: TracebackType | None,
) -> None:
if self.orig_level is not None:
self.logger.setLevel(self.orig_level)
def _unimplemented_method(self, *args: str, **kwargs: str) -> None:
raise HTTPError(405)
- head: Callable[..., Optional[Awaitable[None]]] = _unimplemented_method
- get: Callable[..., Optional[Awaitable[None]]] = _unimplemented_method
- post: Callable[..., Optional[Awaitable[None]]] = _unimplemented_method
- delete: Callable[..., Optional[Awaitable[None]]] = _unimplemented_method
- patch: Callable[..., Optional[Awaitable[None]]] = _unimplemented_method
- put: Callable[..., Optional[Awaitable[None]]] = _unimplemented_method
- options: Callable[..., Optional[Awaitable[None]]] = _unimplemented_method
-
- def prepare(self) -> Optional[Awaitable[None]]:
+ head: Callable[..., Awaitable[None] | None] = _unimplemented_method
+ get: Callable[..., Awaitable[None] | None] = _unimplemented_method
+ post: Callable[..., Awaitable[None] | None] = _unimplemented_method
+ delete: Callable[..., Awaitable[None] | None] = _unimplemented_method
+ patch: Callable[..., Awaitable[None] | None] = _unimplemented_method
+ put: Callable[..., Awaitable[None] | None] = _unimplemented_method
+ options: Callable[..., Awaitable[None] | None] = _unimplemented_method
+
+ def prepare(self) -> Awaitable[None] | None:
"""Called at the beginning of a request before `get`/`post`/etc.
Override this method to perform common initialization regardless
"""
pass
- def set_status(self, status_code: int, reason: Optional[str] = None) -> None:
+ def set_status(self, status_code: int, reason: str | None = None) -> None:
"""Sets the status code for our response.
:arg int status_code: Response status code.
@overload
def get_argument( # noqa: F811
self, name: str, default: None, strip: bool = True
- ) -> Optional[str]:
+ ) -> str | None:
pass
def get_argument( # noqa: F811
self,
name: str,
- default: Union[None, str, _ArgDefaultMarker] = _ARG_DEFAULT,
+ default: None | str | _ArgDefaultMarker = _ARG_DEFAULT,
strip: bool = True,
- ) -> Optional[str]:
+ ) -> str | None:
"""Returns the value of the argument with the given name.
If default is not provided, the argument is considered to be
@overload
def get_body_argument( # noqa: F811
self, name: str, default: None, strip: bool = True
- ) -> Optional[str]:
+ ) -> str | None:
pass
def get_body_argument( # noqa: F811
self,
name: str,
- default: Union[None, str, _ArgDefaultMarker] = _ARG_DEFAULT,
+ default: None | str | _ArgDefaultMarker = _ARG_DEFAULT,
strip: bool = True,
- ) -> Optional[str]:
+ ) -> str | None:
"""Returns the value of the argument with the given name
from the request body.
@overload
def get_query_argument( # noqa: F811
self, name: str, default: None, strip: bool = True
- ) -> Optional[str]:
+ ) -> str | None:
pass
def get_query_argument( # noqa: F811
self,
name: str,
- default: Union[None, str, _ArgDefaultMarker] = _ARG_DEFAULT,
+ default: None | str | _ArgDefaultMarker = _ARG_DEFAULT,
strip: bool = True,
- ) -> Optional[str]:
+ ) -> str | None:
"""Returns the value of the argument with the given name
from the request query string.
def _get_argument(
self,
name: str,
- default: Union[None, str, _ArgDefaultMarker],
+ default: None | str | _ArgDefaultMarker,
source: dict[str, list[bytes]],
strip: bool = True,
- ) -> Optional[str]:
+ ) -> str | None:
args = self._get_arguments(name, source, strip=strip)
if not args:
if isinstance(default, _ArgDefaultMarker):
values.append(s)
return values
- def decode_argument(self, value: bytes, name: Optional[str] = None) -> str:
+ def decode_argument(self, value: bytes, name: str | None = None) -> str:
"""Decodes an argument from the request.
The argument has been percent-decoded and is now a byte string.
pass
@overload
- def get_cookie(self, name: str, default: None = None) -> Optional[str]:
+ def get_cookie(self, name: str, default: None = None) -> str | None:
pass
- def get_cookie(self, name: str, default: Optional[str] = None) -> Optional[str]:
+ def get_cookie(self, name: str, default: str | None = None) -> str | None:
"""Returns the value of the request cookie with the given name.
If the named cookie is not present, returns ``default``.
def set_cookie(
self,
name: str,
- value: Union[str, bytes],
- domain: Optional[str] = None,
- expires: Optional[Union[float, tuple, datetime.datetime]] = None,
+ value: str | bytes,
+ domain: str | None = None,
+ expires: float | tuple | datetime.datetime | None = None,
path: str = "/",
- expires_days: Optional[float] = None,
+ expires_days: float | None = None,
# Keyword-only args start here for historical reasons.
*,
- max_age: Optional[int] = None,
+ max_age: int | None = None,
httponly: bool = False,
secure: bool = False,
- samesite: Optional[str] = None,
+ samesite: str | None = None,
**kwargs: Any,
) -> None:
"""Sets an outgoing cookie name/value with the given options.
def set_signed_cookie(
self,
name: str,
- value: Union[str, bytes],
- expires_days: Optional[float] = 30,
- version: Optional[int] = None,
+ value: str | bytes,
+ expires_days: float | None = 30,
+ version: int | None = None,
**kwargs: Any,
) -> None:
"""Signs and timestamps a cookie so it cannot be forged.
set_secure_cookie = set_signed_cookie
def create_signed_value(
- self, name: str, value: Union[str, bytes], version: Optional[int] = None
+ self, name: str, value: str | bytes, version: int | None = None
) -> bytes:
"""Signs and timestamps a string so it cannot be forged.
def get_signed_cookie(
self,
name: str,
- value: Optional[str] = None,
+ value: str | None = None,
max_age_days: float = 31,
- min_version: Optional[int] = None,
- ) -> Optional[bytes]:
+ min_version: int | None = None,
+ ) -> bytes | None:
"""Returns the given signed cookie if it validates, or None.
The decoded cookie value is returned as a byte string (unlike
get_secure_cookie = get_signed_cookie
def get_signed_cookie_key_version(
- self, name: str, value: Optional[str] = None
- ) -> Optional[int]:
+ self, name: str, value: str | None = None
+ ) -> int | None:
"""Returns the signing key version of the secure cookie.
The version is returned as int.
get_secure_cookie_key_version = get_signed_cookie_key_version
def redirect(
- self, url: str, permanent: bool = False, status: Optional[int] = None
+ self, url: str, permanent: bool = False, status: int | None = None
) -> None:
"""Sends a redirect to the given (optionally relative) URL.
self.set_header("Location", utf8(url))
self.finish()
- def write(self, chunk: Union[str, bytes, dict]) -> None:
+ def write(self, chunk: str | bytes | dict) -> None:
"""Writes the given chunk to the output buffer.
To write the output to the network, use the `flush()` method below.
future.set_result(None)
return future
- def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> "Future[None]":
+ def finish(self, chunk: str | bytes | dict | None = None) -> "Future[None]":
"""Finishes this response, ending the HTTP request.
Passing a ``chunk`` to ``finish()`` is equivalent to passing that
def locale(self, value: tornado.locale.Locale) -> None:
self._locale = value
- def get_user_locale(self) -> Optional[tornado.locale.Locale]:
+ def get_user_locale(self) -> tornado.locale.Locale | None:
"""Override to determine the locale from the authenticated user.
If None is returned, we fall back to `get_browser_locale()`.
self.require_setting("login_url", "@tornado.web.authenticated")
return self.application.settings["login_url"]
- def get_template_path(self) -> Optional[str]:
+ def get_template_path(self) -> str | None:
"""Override to customize template path for each handler.
By default, we use the ``template_path`` application setting.
self.set_cookie(cookie_name, self._xsrf_token, **cookie_kwargs)
return self._xsrf_token
- def _get_raw_xsrf_token(self) -> tuple[Optional[int], bytes, float]:
+ def _get_raw_xsrf_token(self) -> tuple[int | None, bytes, float]:
"""Read or generate the xsrf token in its raw form.
The raw_xsrf_token is a tuple containing:
def _decode_xsrf_token(
self, cookie: str
- ) -> tuple[Optional[int], Optional[bytes], Optional[float]]:
+ ) -> tuple[int | None, bytes | None, float | None]:
"""Convert a cookie string into a the tuple form returned by
_get_raw_xsrf_token.
"""
)
def static_url(
- self, path: str, include_host: Optional[bool] = None, **kwargs: Any
+ self, path: str, include_host: bool | None = None, **kwargs: Any
) -> str:
"""Returns a static URL for the given relative static file path.
"""Alias for `Application.reverse_url`."""
return self.application.reverse_url(name, *args)
- def compute_etag(self) -> Optional[str]:
+ def compute_etag(self) -> str | None:
"""Computes the etag header to be used for this request.
By default uses a hash of the content written so far.
# in a finally block to avoid GC issues prior to Python 3.4.
self._prepared_future.set_result(None)
- def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
+ def data_received(self, chunk: bytes) -> Awaitable[None] | None:
"""Implement this method to handle streamed request data.
Requires the `.stream_request_body` decorator.
def log_exception(
self,
typ: "Optional[Type[BaseException]]",
- value: Optional[BaseException],
- tb: Optional[TracebackType],
+ value: BaseException | None,
+ tb: TracebackType | None,
) -> None:
"""Override to customize logging of uncaught exceptions.
def removeslash(
- method: Callable[..., Optional[Awaitable[None]]],
-) -> Callable[..., Optional[Awaitable[None]]]:
+ method: Callable[..., Awaitable[None] | None],
+) -> Callable[..., Awaitable[None] | None]:
"""Use this decorator to remove trailing slashes from the request path.
For example, a request to ``/foo/`` would redirect to ``/foo`` with this
@functools.wraps(method)
def wrapper( # type: ignore
self: RequestHandler, *args, **kwargs
- ) -> Optional[Awaitable[None]]:
+ ) -> Awaitable[None] | None:
if self.request.path.endswith("/"):
if self.request.method in ("GET", "HEAD"):
uri = self.request.path.rstrip("/")
def addslash(
- method: Callable[..., Optional[Awaitable[None]]],
-) -> Callable[..., Optional[Awaitable[None]]]:
+ method: Callable[..., Awaitable[None] | None],
+) -> Callable[..., Awaitable[None] | None]:
"""Use this decorator to add a missing trailing slash to the request path.
For example, a request to ``/foo`` would redirect to ``/foo/`` with this
@functools.wraps(method)
def wrapper( # type: ignore
self: RequestHandler, *args, **kwargs
- ) -> Optional[Awaitable[None]]:
+ ) -> Awaitable[None] | None:
if not self.request.path.endswith("/"):
if self.request.method in ("GET", "HEAD"):
uri = self.request.path + "/"
"""
def __init__(
- self, application: "Application", rules: Optional[_RuleList] = None
+ self, application: "Application", rules: _RuleList | None = None
) -> None:
assert isinstance(application, Application)
self.application = application
def get_target_delegate(
self, target: Any, request: httputil.HTTPServerRequest, **target_params: Any
- ) -> Optional[httputil.HTTPMessageDelegate]:
+ ) -> httputil.HTTPMessageDelegate | None:
if isclass(target) and issubclass(target, RequestHandler):
return self.application.get_handler_delegate(
request, target, **target_params
def __init__(
self,
- handlers: Optional[_RuleList] = None,
- default_host: Optional[str] = None,
- transforms: Optional[list[type["OutputTransform"]]] = None,
+ handlers: _RuleList | None = None,
+ default_host: str | None = None,
+ transforms: list[type["OutputTransform"]] | None = None,
**settings: Any,
) -> None:
if transforms is None:
def listen(
self,
port: int,
- address: Optional[str] = None,
+ address: str | None = None,
*,
family: socket.AddressFamily = socket.AF_UNSPEC,
backlog: int = tornado.netutil._DEFAULT_BACKLOG,
- flags: Optional[int] = None,
+ flags: int | None = None,
reuse_port: bool = False,
**kwargs: Any,
) -> HTTPServer:
except TypeError:
pass
- def __call__(
- self, request: httputil.HTTPServerRequest
- ) -> Optional[Awaitable[None]]:
+ def __call__(self, request: httputil.HTTPServerRequest) -> Awaitable[None] | None:
# Legacy HTTPServer interface
dispatcher = self.find_handler(request)
return dispatcher.execute()
self,
request: httputil.HTTPServerRequest,
target_class: type[RequestHandler],
- target_kwargs: Optional[dict[str, Any]] = None,
- path_args: Optional[list[bytes]] = None,
- path_kwargs: Optional[dict[str, bytes]] = None,
+ target_kwargs: dict[str, Any] | None = None,
+ path_args: list[bytes] | None = None,
+ path_kwargs: dict[str, bytes] | None = None,
) -> "_HandlerDelegate":
"""Returns `~.httputil.HTTPMessageDelegate` that can serve a request
for application and `RequestHandler` subclass.
application: Application,
request: httputil.HTTPServerRequest,
handler_class: type[RequestHandler],
- handler_kwargs: Optional[dict[str, Any]],
- path_args: Optional[list[bytes]],
- path_kwargs: Optional[dict[str, bytes]],
+ handler_kwargs: dict[str, Any] | None,
+ path_args: list[bytes] | None,
+ path_kwargs: dict[str, bytes] | None,
) -> None:
self.application = application
self.connection = request.connection
def headers_received(
self,
- start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
+ start_line: httputil.RequestStartLine | httputil.ResponseStartLine,
headers: httputil.HTTPHeaders,
- ) -> Optional[Awaitable[None]]:
+ ) -> Awaitable[None] | None:
if self.stream_request_body:
self.request._body_future = Future()
return self.execute()
return None
- def data_received(self, data: bytes) -> Optional[Awaitable[None]]:
+ def data_received(self, data: bytes) -> Awaitable[None] | None:
if self.stream_request_body:
return self.handler.data_received(data)
else:
else:
self.chunks = None # type: ignore
- def execute(self) -> Optional[Awaitable[None]]:
+ def execute(self) -> Awaitable[None] | None:
# If template cache is disabled (usually in the debug mode),
# re-compile templates and reload static files on every
# request so you don't need to restart to see changes
def __init__(
self,
status_code: int = 500,
- log_message: Optional[str] = None,
+ log_message: str | None = None,
*args: Any,
**kwargs: Any,
) -> None:
self.reason = kwargs.get("reason", None)
@property
- def log_message(self) -> Optional[str]:
+ def log_message(self) -> str | None:
"""
A backwards compatible way of accessing log_message.
"""
return self._log_message.replace("%", "%%")
return self._log_message
- def get_message(self) -> Optional[str]:
+ def get_message(self) -> str | None:
if self._log_message and self.args:
return self._log_message % self.args
return self._log_message
CACHE_MAX_AGE = 86400 * 365 * 10 # 10 years
- _static_hashes: dict[str, Optional[str]] = {}
+ _static_hashes: dict[str, str | None] = {}
_lock = threading.Lock() # protects _static_hashes
- def initialize(self, path: str, default_filename: Optional[str] = None) -> None:
+ def initialize(self, path: str, default_filename: str | None = None) -> None:
self.root = path
self.default_filename = default_filename
else:
assert self.request.method == "HEAD"
- def compute_etag(self) -> Optional[str]:
+ def compute_etag(self) -> str | None:
"""Sets the ``Etag`` header based on static url version.
This allows efficient ``If-None-Match`` checks against cached
abspath = os.path.abspath(os.path.join(root, path))
return abspath
- def validate_absolute_path(self, root: str, absolute_path: str) -> Optional[str]:
+ def validate_absolute_path(self, root: str, absolute_path: str) -> str | None:
"""Validate and return the absolute path.
``root`` is the configured path for the `StaticFileHandler`,
@classmethod
def get_content(
- cls, abspath: str, start: Optional[int] = None, end: Optional[int] = None
+ cls, abspath: str, start: int | None = None, end: int | None = None
) -> Generator[bytes, None, None]:
"""Retrieve the content of the requested resource which is located
at the given absolute path.
if start is not None:
file.seek(start)
if end is not None:
- remaining: Optional[int] = end - (start or 0)
+ remaining: int | None = end - (start or 0)
else:
remaining = None
while True:
stat_result = self._stat()
return stat_result.st_size
- def get_modified_time(self) -> Optional[datetime.datetime]:
+ def get_modified_time(self) -> datetime.datetime | None:
"""Returns the time that ``self.absolute_path`` was last modified.
May be overridden in subclasses. Should return a `~datetime.datetime`
pass
def get_cache_time(
- self, path: str, modified: Optional[datetime.datetime], mime_type: str
+ self, path: str, modified: datetime.datetime | None, mime_type: str
) -> int:
"""Override to customize cache control behavior.
return url_path
@classmethod
- def get_version(cls, settings: dict[str, Any], path: str) -> Optional[str]:
+ def get_version(cls, settings: dict[str, Any], path: str) -> str | None:
"""Generate the version string to be used in static URLs.
``settings`` is the `Application.settings` dictionary and ``path``
return cls._get_cached_version(abs_path)
@classmethod
- def _get_cached_version(cls, abs_path: str) -> Optional[str]:
+ def _get_cached_version(cls, abs_path: str) -> str | None:
with cls._lock:
hashes = cls._static_hashes
if abs_path not in hashes:
def authenticated(
- method: Callable[..., Optional[Awaitable[None]]],
-) -> Callable[..., Optional[Awaitable[None]]]:
+ method: Callable[..., Awaitable[None] | None],
+) -> Callable[..., Awaitable[None] | None]:
"""Decorate methods with this to require that the user be logged in.
If the user is not logged in, they will be redirected to the configured
@functools.wraps(method)
def wrapper( # type: ignore
self: RequestHandler, *args, **kwargs
- ) -> Optional[Awaitable[None]]:
+ ) -> Awaitable[None] | None:
if not self.current_user:
if self.request.method in ("GET", "HEAD"):
url = self.get_login_url()
def current_user(self) -> Any:
return self.handler.current_user
- def render(self, *args: Any, **kwargs: Any) -> Union[str, bytes]:
+ def render(self, *args: Any, **kwargs: Any) -> str | bytes:
"""Override in subclasses to return this module's output."""
raise NotImplementedError()
- def embedded_javascript(self) -> Optional[str]:
+ def embedded_javascript(self) -> str | None:
"""Override to return a JavaScript string
to be embedded in the page."""
return None
- def javascript_files(self) -> Optional[Iterable[str]]:
+ def javascript_files(self) -> Iterable[str] | None:
"""Override to return a list of JavaScript files needed by this module.
If the return values are relative paths, they will be passed to
"""
return None
- def embedded_css(self) -> Optional[str]:
+ def embedded_css(self) -> str | None:
"""Override to return a CSS string
that will be embedded in the page."""
return None
- def css_files(self) -> Optional[Iterable[str]]:
+ def css_files(self) -> Iterable[str] | None:
"""Override to returns a list of CSS files required by this module.
If the return values are relative paths, they will be passed to
"""
return None
- def html_head(self) -> Optional[str]:
+ def html_head(self) -> str | None:
"""Override to return an HTML string that will be put in the <head/>
element.
"""
return None
- def html_body(self) -> Optional[str]:
+ def html_body(self) -> str | None:
"""Override to return an HTML string that will be put at the end of
the <body/> element.
"""
def create_signed_value(
secret: _CookieSecretTypes,
name: str,
- value: Union[str, bytes],
- version: Optional[int] = None,
- clock: Optional[Callable[[], float]] = None,
- key_version: Optional[int] = None,
+ value: str | bytes,
+ version: int | None = None,
+ clock: Callable[[], float] | None = None,
+ key_version: int | None = None,
) -> bytes:
if version is None:
version = DEFAULT_SIGNED_VALUE_VERSION
# - name (not encoded; assumed to be ~alphanumeric)
# - value (base64-encoded)
# - signature (hex-encoded; no length prefix)
- def format_field(s: Union[str, bytes]) -> bytes:
+ def format_field(s: str | bytes) -> bytes:
return utf8("%d:" % len(s)) + utf8(s)
to_sign = b"|".join(
def decode_signed_value(
secret: _CookieSecretTypes,
name: str,
- value: Union[None, str, bytes],
+ value: None | str | bytes,
max_age_days: float = 31,
- clock: Optional[Callable[[], float]] = None,
- min_version: Optional[int] = None,
-) -> Optional[bytes]:
+ clock: Callable[[], float] | None = None,
+ min_version: int | None = None,
+) -> bytes | None:
if clock is None:
clock = time.time
if min_version is None:
def _decode_signed_value_v1(
- secret: Union[str, bytes],
+ secret: str | bytes,
name: str,
value: bytes,
max_age_days: float,
clock: Callable[[], float],
-) -> Optional[bytes]:
+) -> bytes | None:
parts = utf8(value).split(b"|")
if len(parts) != 3:
return None
value: bytes,
max_age_days: float,
clock: Callable[[], float],
-) -> Optional[bytes]:
+) -> bytes | None:
try:
(
key_version,
return None
-def get_signature_key_version(value: Union[str, bytes]) -> Optional[int]:
+def get_signature_key_version(value: str | bytes) -> int | None:
value = utf8(value)
version = _get_version(value)
if version < 2:
return key_version
-def _create_signature_v1(secret: Union[str, bytes], *parts: Union[str, bytes]) -> bytes:
+def _create_signature_v1(secret: str | bytes, *parts: str | bytes) -> bytes:
hash = hmac.new(utf8(secret), digestmod=hashlib.sha1)
for part in parts:
hash.update(utf8(part))
return utf8(hash.hexdigest())
-def _create_signature_v2(secret: Union[str, bytes], s: bytes) -> bytes:
+def _create_signature_v2(secret: str | bytes, s: bytes) -> bytes:
hash = hmac.new(utf8(secret), digestmod=hashlib.sha256)
hash.update(utf8(s))
return utf8(hash.hexdigest())
# the server side and WebSocketClientConnection on the client
# side.
def on_ws_connection_close(
- self, close_code: Optional[int] = None, close_reason: Optional[str] = None
+ self, close_code: int | None = None, close_reason: str | None = None
) -> None:
pass
- def on_message(self, message: Union[str, bytes]) -> Optional["Awaitable[None]"]:
+ def on_message(self, message: str | bytes) -> Optional["Awaitable[None]"]:
pass
def on_ping(self, data: bytes) -> None:
def log_exception(
self,
- typ: Optional[type[BaseException]],
- value: Optional[BaseException],
- tb: Optional[TracebackType],
+ typ: type[BaseException] | None,
+ value: BaseException | None,
+ tb: TracebackType | None,
) -> None:
pass
class _WebSocketParams:
def __init__(
self,
- ping_interval: Optional[float] = None,
- ping_timeout: Optional[float] = None,
+ ping_interval: float | None = None,
+ ping_timeout: float | None = None,
max_message_size: int = _default_max_message_size,
- compression_options: Optional[dict[str, Any]] = None,
+ compression_options: dict[str, Any] | None = None,
) -> None:
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
**kwargs: Any,
) -> None:
super().__init__(application, request, **kwargs)
- self.ws_connection: Optional[WebSocketProtocol] = None
- self.close_code: Optional[int] = None
- self.close_reason: Optional[str] = None
+ self.ws_connection: WebSocketProtocol | None = None
+ self.close_code: int | None = None
+ self.close_reason: str | None = None
self._on_close_called = False
async def get(self, *args: Any, **kwargs: Any) -> None:
self.set_header("Sec-WebSocket-Version", "7, 8, 13")
@property
- def ping_interval(self) -> Optional[float]:
+ def ping_interval(self) -> float | None:
"""The interval for sending websocket pings.
If this is non-zero, the websocket will send a ping every
return self.settings.get("websocket_ping_interval", None)
@property
- def ping_timeout(self) -> Optional[float]:
+ def ping_timeout(self) -> float | None:
"""Timeout if no pong is received in this many seconds.
To be used in combination with ``websocket_ping_interval > 0``.
)
def write_message(
- self, message: Union[bytes, str, dict[str, Any]], binary: bool = False
+ self, message: bytes | str | dict[str, Any], binary: bool = False
) -> "Future[None]":
"""Sends the given message to the client of this Web Socket.
message = tornado.escape.json_encode(message)
return self.ws_connection.write_message(message, binary=binary)
- def select_subprotocol(self, subprotocols: list[str]) -> Optional[str]:
+ def select_subprotocol(self, subprotocols: list[str]) -> str | None:
"""Override to implement subprotocol negotiation.
``subprotocols`` is a list of strings identifying the
return None
@property
- def selected_subprotocol(self) -> Optional[str]:
+ def selected_subprotocol(self) -> str | None:
"""The subprotocol returned by `select_subprotocol`.
.. versionadded:: 5.1
assert self.ws_connection is not None
return self.ws_connection.selected_subprotocol
- def get_compression_options(self) -> Optional[dict[str, Any]]:
+ def get_compression_options(self) -> dict[str, Any] | None:
"""Override to return compression options for the connection.
If this method returns None (the default), compression will
# TODO: Add wbits option.
return None
- def _open(self, *args: str, **kwargs: str) -> Optional[Awaitable[None]]:
+ def _open(self, *args: str, **kwargs: str) -> Awaitable[None] | None:
pass
- open: Callable[..., Optional[Awaitable[None]]] = _open
+ open: Callable[..., Awaitable[None] | None] = _open
"""Invoked when a new WebSocket is opened.
The arguments to `open` are extracted from the `tornado.web.URLSpec`
``open`` may be a coroutine.
"""
- def on_message(self, message: Union[str, bytes]) -> Optional[Awaitable[None]]:
+ def on_message(self, message: str | bytes) -> Awaitable[None] | None:
"""Handle incoming messages on the WebSocket
This method must be overridden.
"""
raise NotImplementedError
- def ping(self, data: Union[str, bytes] = b"") -> None:
+ def ping(self, data: str | bytes = b"") -> None:
"""Send ping frame to the remote end.
The data argument allows a small amount of data (up to 125
"""
pass
- def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None:
+ def close(self, code: int | None = None, reason: str | None = None) -> None:
"""Closes this Web Socket.
Once the close handshake is successful the socket will be closed.
self._break_cycles()
def on_ws_connection_close(
- self, close_code: Optional[int] = None, close_reason: Optional[str] = None
+ self, close_code: int | None = None, close_reason: str | None = None
) -> None:
self.close_code = close_code
self.close_reason = close_reason
def __init__(self, handler: "_WebSocketDelegate") -> None:
self.handler = handler
- self.stream: Optional[IOStream] = None
+ self.stream: IOStream | None = None
self.client_terminated = False
self.server_terminated = False
self.close() # let the subclass cleanup
@abc.abstractmethod
- def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None:
+ def close(self, code: int | None = None, reason: str | None = None) -> None:
raise NotImplementedError()
@abc.abstractmethod
@abc.abstractmethod
def write_message(
- self, message: Union[str, bytes, dict[str, Any]], binary: bool = False
+ self, message: str | bytes | dict[str, Any], binary: bool = False
) -> "Future[None]":
raise NotImplementedError()
@property
@abc.abstractmethod
- def selected_subprotocol(self) -> Optional[str]:
+ def selected_subprotocol(self) -> str | None:
raise NotImplementedError()
@abc.abstractmethod
# boundary is currently pretty ad-hoc.
@abc.abstractmethod
def _process_server_headers(
- self, key: Union[str, bytes], headers: httputil.HTTPHeaders
+ self, key: str | bytes, headers: httputil.HTTPHeaders
) -> None:
raise NotImplementedError()
def __init__(
self,
persistent: bool,
- max_wbits: Optional[int],
- compression_options: Optional[dict[str, Any]] = None,
+ max_wbits: int | None,
+ compression_options: dict[str, Any] | None = None,
) -> None:
if max_wbits is None:
max_wbits = zlib.MAX_WBITS
self._mem_level = compression_options["mem_level"]
if persistent:
- self._compressor: Optional[_Compressor] = self._create_compressor()
+ self._compressor: _Compressor | None = self._create_compressor()
else:
self._compressor = None
def __init__(
self,
persistent: bool,
- max_wbits: Optional[int],
+ max_wbits: int | None,
max_message_size: int,
- compression_options: Optional[dict[str, Any]] = None,
+ compression_options: dict[str, Any] | None = None,
) -> None:
self._max_message_size = max_message_size
if max_wbits is None:
)
self._max_wbits = max_wbits
if persistent:
- self._decompressor: Optional[_Decompressor] = self._create_decompressor()
+ self._decompressor: _Decompressor | None = self._create_decompressor()
else:
self._decompressor = None
self._final_frame = False
self._frame_opcode = None
self._masked_frame = None
- self._frame_mask: Optional[bytes] = None
+ self._frame_mask: bytes | None = None
self._frame_length = None
- self._fragmented_message_buffer: Optional[bytearray] = None
+ self._fragmented_message_buffer: bytearray | None = None
self._fragmented_message_opcode = None
self._waiting: object = None
self._compression_options = params.compression_options
- self._decompressor: Optional[_PerMessageDeflateDecompressor] = None
- self._compressor: Optional[_PerMessageDeflateCompressor] = None
- self._frame_compressed: Optional[bool] = None
+ self._decompressor: _PerMessageDeflateDecompressor | None = None
+ self._compressor: _PerMessageDeflateCompressor | None = None
+ self._frame_compressed: bool | None = None
# The total uncompressed size of all messages received or sent.
# Unicode messages are encoded to utf8.
# Only for testing; subject to change.
self._wire_bytes_in = 0
self._wire_bytes_out = 0
self._received_pong: bool = False
- self.close_code: Optional[int] = None
- self.close_reason: Optional[str] = None
- self._ping_coroutine: Optional[asyncio.Task] = None
+ self.close_code: int | None = None
+ self.close_reason: str | None = None
+ self._ping_coroutine: asyncio.Task | None = None
# Use a property for this to satisfy the abc.
@property
- def selected_subprotocol(self) -> Optional[str]:
+ def selected_subprotocol(self) -> str | None:
return self._selected_subprotocol
@selected_subprotocol.setter
- def selected_subprotocol(self, value: Optional[str]) -> None:
+ def selected_subprotocol(self, value: str | None) -> None:
self._selected_subprotocol = value
async def accept_connection(self, handler: WebSocketHandler) -> None:
raise ValueError("Missing/Invalid WebSocket headers")
@staticmethod
- def compute_accept_value(key: Union[str, bytes]) -> str:
+ def compute_accept_value(key: str | bytes) -> str:
"""Computes the value for the Sec-WebSocket-Accept header,
given the value for Sec-WebSocket-Key.
"""
return []
def _process_server_headers(
- self, key: Union[str, bytes], headers: httputil.HTTPHeaders
+ self, key: str | bytes, headers: httputil.HTTPHeaders
) -> None:
"""Process the headers sent by the server to this client connection.
self,
side: str,
agreed_parameters: dict[str, Any],
- compression_options: Optional[dict[str, Any]] = None,
+ compression_options: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Converts a websocket agreed_parameters set to keyword arguments
for our compressor objects.
self,
side: str,
agreed_parameters: dict[str, Any],
- compression_options: Optional[dict[str, Any]] = None,
+ compression_options: dict[str, Any] | None = None,
) -> None:
# TODO: handle invalid parameters gracefully
allowed_keys = {
return self.stream.write(frame)
def write_message(
- self, message: Union[str, bytes, dict[str, Any]], binary: bool = False
+ self, message: str | bytes | dict[str, Any], binary: bool = False
) -> "Future[None]":
"""Sends the given message to the client of this Web Socket."""
if binary:
self._abort()
return None
- def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None:
+ def close(self, code: int | None = None, reason: str | None = None) -> None:
"""Closes the WebSocket connection."""
if not self.server_terminated:
if not self.stream.closed():
`websocket_connect` function instead.
"""
- protocol: Optional[WebSocketProtocol] = None
+ protocol: WebSocketProtocol | None = None
def __init__(
self,
request: httpclient.HTTPRequest,
- on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]] = None,
- compression_options: Optional[dict[str, Any]] = None,
- ping_interval: Optional[float] = None,
- ping_timeout: Optional[float] = None,
+ on_message_callback: Callable[[None | str | bytes], None] | None = None,
+ compression_options: dict[str, Any] | None = None,
+ ping_interval: float | None = None,
+ ping_timeout: float | None = None,
max_message_size: int = _default_max_message_size,
- subprotocols: Optional[list[str]] = None,
- resolver: Optional[Resolver] = None,
+ subprotocols: list[str] | None = None,
+ resolver: Resolver | None = None,
) -> None:
self.connect_future: Future[WebSocketClientConnection] = Future()
- self.read_queue: Queue[Union[None, str, bytes]] = Queue(1)
+ self.read_queue: Queue[None | str | bytes] = Queue(1)
self.key = base64.b64encode(os.urandom(16))
self._on_message_callback = on_message_callback
- self.close_code: Optional[int] = None
- self.close_reason: Optional[str] = None
+ self.close_code: int | None = None
+ self.close_reason: str | None = None
self.params = _WebSocketParams(
ping_interval=ping_interval,
ping_timeout=ping_timeout,
# dependent on GC timing).
warnings.warn("Unclosed WebSocketClientConnection", ResourceWarning)
- def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None:
+ def close(self, code: int | None = None, reason: str | None = None) -> None:
"""Closes the websocket connection.
``code`` and ``reason`` are documented under
super().on_connection_close()
def on_ws_connection_close(
- self, close_code: Optional[int] = None, close_reason: Optional[str] = None
+ self, close_code: int | None = None, close_reason: str | None = None
) -> None:
self.close_code = close_code
self.close_reason = close_reason
async def headers_received(
self,
- start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
+ start_line: httputil.RequestStartLine | httputil.ResponseStartLine,
headers: httputil.HTTPHeaders,
) -> None:
assert isinstance(start_line, httputil.ResponseStartLine)
future_set_result_unless_cancelled(self.connect_future, self)
def write_message(
- self, message: Union[str, bytes, dict[str, Any]], binary: bool = False
+ self, message: str | bytes | dict[str, Any], binary: bool = False
) -> "Future[None]":
"""Sends a message to the WebSocket server.
def read_message(
self,
- callback: Optional[Callable[["Future[Union[None, str, bytes]]"], None]] = None,
- ) -> Awaitable[Union[None, str, bytes]]:
+ callback: Callable[["Future[Union[None, str, bytes]]"], None] | None = None,
+ ) -> Awaitable[None | str | bytes]:
"""Reads a message from the WebSocket server.
If on_message_callback was specified at WebSocket
self.io_loop.add_future(asyncio.ensure_future(awaitable), callback)
return awaitable
- def on_message(self, message: Union[str, bytes]) -> Optional[Awaitable[None]]:
+ def on_message(self, message: str | bytes) -> Awaitable[None] | None:
return self._on_message(message)
- def _on_message(
- self, message: Union[None, str, bytes]
- ) -> Optional[Awaitable[None]]:
+ def _on_message(self, message: None | str | bytes) -> Awaitable[None] | None:
if self._on_message_callback:
self._on_message_callback(message)
return None
return WebSocketProtocol13(self, mask_outgoing=True, params=self.params)
@property
- def selected_subprotocol(self) -> Optional[str]:
+ def selected_subprotocol(self) -> str | None:
"""The subprotocol selected by the server.
.. versionadded:: 5.1
def log_exception(
self,
typ: "Optional[Type[BaseException]]",
- value: Optional[BaseException],
- tb: Optional[TracebackType],
+ value: BaseException | None,
+ tb: TracebackType | None,
) -> None:
assert typ is not None
assert value is not None
def websocket_connect(
- url: Union[str, httpclient.HTTPRequest],
- callback: Optional[Callable[["Future[WebSocketClientConnection]"], None]] = None,
- connect_timeout: Optional[float] = None,
- on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]] = None,
- compression_options: Optional[dict[str, Any]] = None,
- ping_interval: Optional[float] = None,
- ping_timeout: Optional[float] = None,
+ url: str | httpclient.HTTPRequest,
+ callback: Callable[["Future[WebSocketClientConnection]"], None] | None = None,
+ connect_timeout: float | None = None,
+ on_message_callback: Callable[[None | str | bytes], None] | None = None,
+ compression_options: dict[str, Any] | None = None,
+ ping_interval: float | None = None,
+ ping_timeout: float | None = None,
max_message_size: int = _default_max_message_size,
- subprotocols: Optional[list[str]] = None,
- resolver: Optional[Resolver] = None,
+ subprotocols: list[str] | None = None,
+ resolver: Resolver | None = None,
) -> "Awaitable[WebSocketClientConnection]":
"""Client-side websocket support.
def __init__(
self,
wsgi_application: "WSGIAppType",
- executor: Optional[concurrent.futures.Executor] = None,
+ executor: concurrent.futures.Executor | None = None,
) -> None:
self.wsgi_application = wsgi_application
self.executor = dummy_executor if executor is None else executor
def start_response(
status: str,
headers: list[tuple[str, str]],
- exc_info: Optional[
+ exc_info: None | (
tuple[
"Optional[Type[BaseException]]",
- Optional[BaseException],
- Optional[TracebackType],
+ BaseException | None,
+ TracebackType | None,
]
- ] = None,
+ ) = None,
) -> Callable[[bytes], Any]:
data["status"] = status
data["headers"] = headers
try:
app_response_iter = iter(app_response)
- def next_chunk() -> Optional[bytes]:
+ def next_chunk() -> bytes | None:
try:
return next(app_response_iter)
except StopIteration: