+from __future__ import annotations
+
import typing
from contextlib import contextmanager
method: str,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ params: QueryParamTypes | None = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
trust_env: bool = True,
) -> Response:
"""
method: str,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ params: QueryParamTypes | None = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
trust_env: bool = True,
) -> typing.Iterator[Response]:
"""
def get(
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
follow_redirects: bool = False,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
trust_env: bool = True,
def options(
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
follow_redirects: bool = False,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
trust_env: bool = True,
def head(
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
follow_redirects: bool = False,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
trust_env: bool = True,
def post(
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
follow_redirects: bool = False,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
trust_env: bool = True,
def put(
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
follow_redirects: bool = False,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
trust_env: bool = True,
def patch(
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
follow_redirects: bool = False,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
trust_env: bool = True,
def delete(
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Optional[AuthTypes] = None,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | None = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
follow_redirects: bool = False,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
trust_env: bool = True,
+from __future__ import annotations
+
import hashlib
import os
import re
and uses HTTP Basic authentication.
"""
- def __init__(
- self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
- ) -> None:
+ def __init__(self, username: str | bytes, password: str | bytes) -> None:
self._auth_header = self._build_auth_header(username, password)
def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
request.headers["Authorization"] = self._auth_header
yield request
- def _build_auth_header(
- self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
- ) -> str:
+ def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str:
userpass = b":".join((to_bytes(username), to_bytes(password)))
token = b64encode(userpass).decode()
return f"Basic {token}"
Use a 'netrc' file to lookup basic auth credentials based on the url host.
"""
- def __init__(self, file: typing.Optional[str] = None) -> None:
+ def __init__(self, file: str | None = None) -> None:
# Lazily import 'netrc'.
# There's no need for us to load this module unless 'NetRCAuth' is being used.
import netrc
)
yield request
- def _build_auth_header(
- self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
- ) -> str:
+ def _build_auth_header(self, username: str | bytes, password: str | bytes) -> str:
userpass = b":".join((to_bytes(username), to_bytes(password)))
token = b64encode(userpass).decode()
return f"Basic {token}"
class DigestAuth(Auth):
- _ALGORITHM_TO_HASH_FUNCTION: typing.Dict[str, typing.Callable[[bytes], "_Hash"]] = {
+ _ALGORITHM_TO_HASH_FUNCTION: dict[str, typing.Callable[[bytes], _Hash]] = {
"MD5": hashlib.md5,
"MD5-SESS": hashlib.md5,
"SHA": hashlib.sha1,
"SHA-512-SESS": hashlib.sha512,
}
- def __init__(
- self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
- ) -> None:
+ def __init__(self, username: str | bytes, password: str | bytes) -> None:
self._username = to_bytes(username)
self._password = to_bytes(password)
- self._last_challenge: typing.Optional[_DigestAuthChallenge] = None
+ self._last_challenge: _DigestAuthChallenge | None = None
self._nonce_count = 1
def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
def _parse_challenge(
self, request: Request, response: Response, auth_header: str
- ) -> "_DigestAuthChallenge":
+ ) -> _DigestAuthChallenge:
"""
Returns a challenge from a Digest WWW-Authenticate header.
These take the form of:
# This method should only ever have been called with a Digest auth header.
assert scheme.lower() == "digest"
- header_dict: typing.Dict[str, str] = {}
+ header_dict: dict[str, str] = {}
for field in parse_http_list(fields):
key, value = field.strip().split("=", 1)
header_dict[key] = unquote(value)
raise ProtocolError(message, request=request) from exc
def _build_auth_header(
- self, request: Request, challenge: "_DigestAuthChallenge"
+ self, request: Request, challenge: _DigestAuthChallenge
) -> str:
hash_func = self._ALGORITHM_TO_HASH_FUNCTION[challenge.algorithm.upper()]
return hashlib.sha1(s).hexdigest()[:16].encode()
- def _get_header_value(self, header_fields: typing.Dict[str, bytes]) -> str:
+ def _get_header_value(self, header_fields: dict[str, bytes]) -> str:
NON_QUOTED_FIELDS = ("algorithm", "qop", "nc")
QUOTED_TEMPLATE = '{}="{}"'
NON_QUOTED_TEMPLATE = "{}={}"
return header_value
- def _resolve_qop(
- self, qop: typing.Optional[bytes], request: Request
- ) -> typing.Optional[bytes]:
+ def _resolve_qop(self, qop: bytes | None, request: Request) -> bytes | None:
if qop is None:
return None
qops = re.split(b", ?", qop)
realm: bytes
nonce: bytes
algorithm: str
- opaque: typing.Optional[bytes]
- qop: typing.Optional[bytes]
+ opaque: bytes | None
+ qop: bytes | None
+from __future__ import annotations
+
import datetime
import enum
import logging
def __init__(
self,
*,
- auth: typing.Optional[AuthTypes] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
+ auth: AuthTypes | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
- event_hooks: typing.Optional[
- typing.Mapping[str, typing.List[EventHook]]
- ] = None,
+ event_hooks: None | (typing.Mapping[str, list[EventHook]]) = None,
base_url: URLTypes = "",
trust_env: bool = True,
- default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8",
+ default_encoding: str | typing.Callable[[bytes], str] = "utf-8",
) -> None:
event_hooks = {} if event_hooks is None else event_hooks
return url.copy_with(raw_path=url.raw_path + b"/")
def _get_proxy_map(
- self, proxies: typing.Optional[ProxiesTypes], allow_env_proxies: bool
- ) -> typing.Dict[str, typing.Optional[Proxy]]:
+ self, proxies: ProxiesTypes | None, allow_env_proxies: bool
+ ) -> dict[str, Proxy | None]:
if proxies is None:
if allow_env_proxies:
return {
self._timeout = Timeout(timeout)
@property
- def event_hooks(self) -> typing.Dict[str, typing.List[EventHook]]:
+ def event_hooks(self) -> dict[str, list[EventHook]]:
return self._event_hooks
@event_hooks.setter
- def event_hooks(
- self, event_hooks: typing.Dict[str, typing.List[EventHook]]
- ) -> None:
+ def event_hooks(self, event_hooks: dict[str, list[EventHook]]) -> None:
self._event_hooks = {
"request": list(event_hooks.get("request", [])),
"response": list(event_hooks.get("response", [])),
}
@property
- def auth(self) -> typing.Optional[Auth]:
+ def auth(self) -> Auth | None:
"""
Authentication class used when none is passed at the request-level.
method: str,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Request:
"""
Build and return a request instance.
return self.base_url.copy_with(raw_path=merge_raw_path)
return merge_url
- def _merge_cookies(
- self, cookies: typing.Optional[CookieTypes] = None
- ) -> typing.Optional[CookieTypes]:
+ def _merge_cookies(self, cookies: CookieTypes | None = None) -> CookieTypes | None:
"""
Merge a cookies argument together with any cookies on the client,
to create the cookies used for the outgoing request.
return merged_cookies
return cookies
- def _merge_headers(
- self, headers: typing.Optional[HeaderTypes] = None
- ) -> typing.Optional[HeaderTypes]:
+ def _merge_headers(self, headers: HeaderTypes | None = None) -> HeaderTypes | None:
"""
Merge a headers argument together with any headers on the client,
to create the headers used for the outgoing request.
return merged_headers
def _merge_queryparams(
- self, params: typing.Optional[QueryParamTypes] = None
- ) -> typing.Optional[QueryParamTypes]:
+ self, params: QueryParamTypes | None = None
+ ) -> QueryParamTypes | None:
"""
Merge a queryparams argument together with any queryparams on the client,
to create the queryparams used for the outgoing request.
return merged_queryparams.merge(params)
return params
- def _build_auth(self, auth: typing.Optional[AuthTypes]) -> typing.Optional[Auth]:
+ def _build_auth(self, auth: AuthTypes | None) -> Auth | None:
if auth is None:
return None
elif isinstance(auth, tuple):
def _build_request_auth(
self,
request: Request,
- auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
+ auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
) -> Auth:
auth = (
self._auth if isinstance(auth, UseClientDefault) else self._build_auth(auth)
def _redirect_stream(
self, request: Request, method: str
- ) -> typing.Optional[typing.Union[SyncByteStream, AsyncByteStream]]:
+ ) -> SyncByteStream | AsyncByteStream | None:
"""
Return the body that should be used for the redirect request.
"""
def __init__(
self,
*,
- auth: typing.Optional[AuthTypes] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
+ auth: AuthTypes | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
- mounts: typing.Optional[
- typing.Mapping[str, typing.Optional[BaseTransport]]
- ] = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
+ mounts: None | (typing.Mapping[str, BaseTransport | None]) = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
limits: Limits = DEFAULT_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
- event_hooks: typing.Optional[
- typing.Mapping[str, typing.List[EventHook]]
- ] = None,
+ event_hooks: None | (typing.Mapping[str, list[EventHook]]) = None,
base_url: URLTypes = "",
- transport: typing.Optional[BaseTransport] = None,
- app: typing.Optional[typing.Callable[..., typing.Any]] = None,
+ transport: BaseTransport | None = None,
+ app: typing.Callable[..., typing.Any] | None = None,
trust_env: bool = True,
- default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8",
+ default_encoding: str | typing.Callable[[bytes], str] = "utf-8",
) -> None:
super().__init__(
auth=auth,
app=app,
trust_env=trust_env,
)
- self._mounts: typing.Dict[URLPattern, typing.Optional[BaseTransport]] = {
+ self._mounts: dict[URLPattern, BaseTransport | None] = {
URLPattern(key): None
if proxy is None
else self._init_proxy_transport(
def _init_transport(
self,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
- transport: typing.Optional[BaseTransport] = None,
- app: typing.Optional[typing.Callable[..., typing.Any]] = None,
+ transport: BaseTransport | None = None,
+ app: typing.Callable[..., typing.Any] | None = None,
trust_env: bool = True,
) -> BaseTransport:
if transport is not None:
self,
proxy: Proxy,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
method: str,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Build and send a request.
method: str,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> typing.Iterator[Response]:
"""
Alternative to `httpx.request()` that streams the response body
request: Request,
*,
stream: bool = False,
- auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
+ auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
) -> Response:
"""
Send a request.
request: Request,
auth: Auth,
follow_redirects: bool,
- history: typing.List[Response],
+ history: list[Response],
) -> Response:
auth_flow = auth.sync_auth_flow(request)
try:
self,
request: Request,
follow_redirects: bool,
- history: typing.List[Response],
+ history: list[Response],
) -> Response:
while True:
if len(history) > self.max_redirects:
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `GET` request.
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send an `OPTIONS` request.
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `HEAD` request.
self,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `POST` request.
self,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `PUT` request.
self,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `PATCH` request.
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `DELETE` request.
def __exit__(
self,
- exc_type: typing.Optional[typing.Type[BaseException]] = None,
- exc_value: typing.Optional[BaseException] = None,
- traceback: typing.Optional[TracebackType] = None,
+ exc_type: type[BaseException] | None = None,
+ exc_value: BaseException | None = None,
+ traceback: TracebackType | None = None,
) -> None:
self._state = ClientState.CLOSED
def __init__(
self,
*,
- auth: typing.Optional[AuthTypes] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
+ auth: AuthTypes | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
- proxy: typing.Optional[ProxyTypes] = None,
- proxies: typing.Optional[ProxiesTypes] = None,
- mounts: typing.Optional[
- typing.Mapping[str, typing.Optional[AsyncBaseTransport]]
- ] = None,
+ proxy: ProxyTypes | None = None,
+ proxies: ProxiesTypes | None = None,
+ mounts: None | (typing.Mapping[str, AsyncBaseTransport | None]) = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
limits: Limits = DEFAULT_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
- event_hooks: typing.Optional[
- typing.Mapping[str, typing.List[typing.Callable[..., typing.Any]]]
- ] = None,
+ event_hooks: None
+ | (typing.Mapping[str, list[typing.Callable[..., typing.Any]]]) = None,
base_url: URLTypes = "",
- transport: typing.Optional[AsyncBaseTransport] = None,
- app: typing.Optional[typing.Callable[..., typing.Any]] = None,
+ transport: AsyncBaseTransport | None = None,
+ app: typing.Callable[..., typing.Any] | None = None,
trust_env: bool = True,
- default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8",
+ default_encoding: str | typing.Callable[[bytes], str] = "utf-8",
) -> None:
super().__init__(
auth=auth,
trust_env=trust_env,
)
- self._mounts: typing.Dict[URLPattern, typing.Optional[AsyncBaseTransport]] = {
+ self._mounts: dict[URLPattern, AsyncBaseTransport | None] = {
URLPattern(key): None
if proxy is None
else self._init_proxy_transport(
def _init_transport(
self,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
- transport: typing.Optional[AsyncBaseTransport] = None,
- app: typing.Optional[typing.Callable[..., typing.Any]] = None,
+ transport: AsyncBaseTransport | None = None,
+ app: typing.Callable[..., typing.Any] | None = None,
trust_env: bool = True,
) -> AsyncBaseTransport:
if transport is not None:
self,
proxy: Proxy,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
method: str,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Build and send a request.
method: str,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> typing.AsyncIterator[Response]:
"""
Alternative to `httpx.request()` that streams the response body
request: Request,
*,
stream: bool = False,
- auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
+ auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
) -> Response:
"""
Send a request.
request: Request,
auth: Auth,
follow_redirects: bool,
- history: typing.List[Response],
+ history: list[Response],
) -> Response:
auth_flow = auth.async_auth_flow(request)
try:
self,
request: Request,
follow_redirects: bool,
- history: typing.List[Response],
+ history: list[Response],
) -> Response:
while True:
if len(history) > self.max_redirects:
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `GET` request.
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send an `OPTIONS` request.
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `HEAD` request.
self,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `POST` request.
self,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `PUT` request.
self,
url: URLTypes,
*,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `PATCH` request.
self,
url: URLTypes,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
+ timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
+ extensions: RequestExtensions | None = None,
) -> Response:
"""
Send a `DELETE` request.
async def __aexit__(
self,
- exc_type: typing.Optional[typing.Type[BaseException]] = None,
- exc_value: typing.Optional[BaseException] = None,
- traceback: typing.Optional[TracebackType] = None,
+ exc_type: type[BaseException] | None = None,
+ exc_value: BaseException | None = None,
+ traceback: TracebackType | None = None,
) -> None:
self._state = ClientState.CLOSED
+from __future__ import annotations
+
import logging
import os
import ssl
def create_ssl_context(
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
trust_env: bool = True,
http2: bool = False,
def __init__(
self,
*,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
verify: VerifyTypes = True,
trust_env: bool = True,
http2: bool = False,
def __init__(
self,
- timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
+ timeout: TimeoutTypes | UnsetType = UNSET,
*,
- connect: typing.Union[None, float, UnsetType] = UNSET,
- read: typing.Union[None, float, UnsetType] = UNSET,
- write: typing.Union[None, float, UnsetType] = UNSET,
- pool: typing.Union[None, float, UnsetType] = UNSET,
+ connect: None | float | UnsetType = UNSET,
+ read: None | float | UnsetType = UNSET,
+ write: None | float | UnsetType = UNSET,
+ pool: None | float | UnsetType = UNSET,
) -> None:
if isinstance(timeout, Timeout):
# Passed as a single explicit Timeout.
self.write = timeout if isinstance(write, UnsetType) else write
self.pool = timeout if isinstance(pool, UnsetType) else pool
- def as_dict(self) -> typing.Dict[str, typing.Optional[float]]:
+ def as_dict(self) -> dict[str, float | None]:
return {
"connect": self.connect,
"read": self.read,
def __init__(
self,
*,
- max_connections: typing.Optional[int] = None,
- max_keepalive_connections: typing.Optional[int] = None,
- keepalive_expiry: typing.Optional[float] = 5.0,
+ max_connections: int | None = None,
+ max_keepalive_connections: int | None = None,
+ keepalive_expiry: float | None = 5.0,
) -> None:
self.max_connections = max_connections
self.max_keepalive_connections = max_keepalive_connections
self,
url: URLTypes,
*,
- ssl_context: typing.Optional[ssl.SSLContext] = None,
- auth: typing.Optional[typing.Tuple[str, str]] = None,
- headers: typing.Optional[HeaderTypes] = None,
+ ssl_context: ssl.SSLContext | None = None,
+ auth: tuple[str, str] | None = None,
+ headers: HeaderTypes | None = None,
) -> None:
url = URL(url)
headers = Headers(headers)
self.ssl_context = ssl_context
@property
- def raw_auth(self) -> typing.Optional[typing.Tuple[bytes, bytes]]:
+ def raw_auth(self) -> tuple[bytes, bytes] | None:
# The proxy authentication as raw bytes.
return (
None
+from __future__ import annotations
+
import inspect
import warnings
from json import dumps as json_dumps
Any,
AsyncIterable,
AsyncIterator,
- Dict,
Iterable,
Iterator,
Mapping,
- Optional,
- Tuple,
- Union,
)
from urllib.parse import urlencode
def encode_content(
- content: Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]],
-) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
+ content: str | bytes | Iterable[bytes] | AsyncIterable[bytes],
+) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]:
if isinstance(content, (bytes, str)):
body = content.encode("utf-8") if isinstance(content, str) else content
content_length = len(body)
def encode_urlencoded_data(
data: RequestData,
-) -> Tuple[Dict[str, str], ByteStream]:
+) -> tuple[dict[str, str], ByteStream]:
plain_data = []
for key, value in data.items():
if isinstance(value, (list, tuple)):
def encode_multipart_data(
- data: RequestData, files: RequestFiles, boundary: Optional[bytes]
-) -> Tuple[Dict[str, str], MultipartStream]:
+ data: RequestData, files: RequestFiles, boundary: bytes | None
+) -> tuple[dict[str, str], MultipartStream]:
multipart = MultipartStream(data=data, files=files, boundary=boundary)
headers = multipart.get_headers()
return headers, multipart
-def encode_text(text: str) -> Tuple[Dict[str, str], ByteStream]:
+def encode_text(text: str) -> tuple[dict[str, str], ByteStream]:
body = text.encode("utf-8")
content_length = str(len(body))
content_type = "text/plain; charset=utf-8"
return headers, ByteStream(body)
-def encode_html(html: str) -> Tuple[Dict[str, str], ByteStream]:
+def encode_html(html: str) -> tuple[dict[str, str], ByteStream]:
body = html.encode("utf-8")
content_length = str(len(body))
content_type = "text/html; charset=utf-8"
return headers, ByteStream(body)
-def encode_json(json: Any) -> Tuple[Dict[str, str], ByteStream]:
+def encode_json(json: Any) -> tuple[dict[str, str], ByteStream]:
body = json_dumps(json).encode("utf-8")
content_length = str(len(body))
content_type = "application/json"
def encode_request(
- content: Optional[RequestContent] = None,
- data: Optional[RequestData] = None,
- files: Optional[RequestFiles] = None,
- json: Optional[Any] = None,
- boundary: Optional[bytes] = None,
-) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: Any | None = None,
+ boundary: bytes | None = None,
+) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]:
"""
Handles encoding the given `content`, `data`, `files`, and `json`,
returning a two-tuple of (<headers>, <stream>).
def encode_response(
- content: Optional[ResponseContent] = None,
- text: Optional[str] = None,
- html: Optional[str] = None,
- json: Optional[Any] = None,
-) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
+ content: ResponseContent | None = None,
+ text: str | None = None,
+ html: str | None = None,
+ json: Any | None = None,
+) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]:
"""
Handles encoding the given `content`, returning a two-tuple of
(<headers>, <stream>).
See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
"""
+from __future__ import annotations
+
import codecs
import io
import typing
Handles returning byte content in fixed-size chunks.
"""
- def __init__(self, chunk_size: typing.Optional[int] = None) -> None:
+ def __init__(self, chunk_size: int | None = None) -> None:
self._buffer = io.BytesIO()
self._chunk_size = chunk_size
- def decode(self, content: bytes) -> typing.List[bytes]:
+ def decode(self, content: bytes) -> list[bytes]:
if self._chunk_size is None:
return [content] if content else []
else:
return []
- def flush(self) -> typing.List[bytes]:
+ def flush(self) -> list[bytes]:
value = self._buffer.getvalue()
self._buffer.seek(0)
self._buffer.truncate()
Handles returning text content in fixed-size chunks.
"""
- def __init__(self, chunk_size: typing.Optional[int] = None) -> None:
+ def __init__(self, chunk_size: int | None = None) -> None:
self._buffer = io.StringIO()
self._chunk_size = chunk_size
- def decode(self, content: str) -> typing.List[str]:
+ def decode(self, content: str) -> list[str]:
if self._chunk_size is None:
return [content] if content else []
else:
return []
- def flush(self) -> typing.List[str]:
+ def flush(self) -> list[str]:
value = self._buffer.getvalue()
self._buffer.seek(0)
self._buffer.truncate()
"""
def __init__(self) -> None:
- self.buffer: typing.List[str] = []
+ self.buffer: list[str] = []
self.trailing_cr: bool = False
- def decode(self, text: str) -> typing.List[str]:
+ def decode(self, text: str) -> list[str]:
# See https://docs.python.org/3/library/stdtypes.html#str.splitlines
NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029"
return lines
- def flush(self) -> typing.List[str]:
+ def flush(self) -> list[str]:
if not self.buffer and not self.trailing_cr:
return []
x ResponseNotRead
x RequestNotRead
"""
+from __future__ import annotations
+
import contextlib
import typing
def __init__(self, message: str) -> None:
super().__init__(message)
- self._request: typing.Optional["Request"] = None
+ self._request: Request | None = None
@property
- def request(self) -> "Request":
+ def request(self) -> Request:
if self._request is None:
raise RuntimeError("The .request property has not been set.")
return self._request
@request.setter
- def request(self, request: "Request") -> None:
+ def request(self, request: Request) -> None:
self._request = request
Base class for all exceptions that may occur when issuing a `.request()`.
"""
- def __init__(
- self, message: str, *, request: typing.Optional["Request"] = None
- ) -> None:
+ def __init__(self, message: str, *, request: Request | None = None) -> None:
super().__init__(message)
# At the point an exception is raised we won't typically have a request
# instance to associate it with.
May be raised when calling `response.raise_for_status()`
"""
- def __init__(
- self, message: str, *, request: "Request", response: "Response"
- ) -> None:
+ def __init__(self, message: str, *, request: Request, response: Response) -> None:
super().__init__(message)
self.request = request
self.response = response
@contextlib.contextmanager
def request_context(
- request: typing.Optional["Request"] = None,
+ request: Request | None = None,
) -> typing.Iterator[None]:
"""
A context manager that can be used to attach the given request context
+from __future__ import annotations
+
import functools
import json
import sys
def format_response_headers(
http_version: bytes,
status: int,
- reason_phrase: typing.Optional[bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
+ reason_phrase: bytes | None,
+ headers: list[tuple[bytes, bytes]],
) -> str:
version = http_version.decode("ascii")
reason = (
def print_response_headers(
http_version: bytes,
status: int,
- reason_phrase: typing.Optional[bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
+ reason_phrase: bytes | None,
+ headers: list[tuple[bytes, bytes]],
) -> None:
console = rich.console.Console()
http_text = format_response_headers(http_version, status, reason_phrase, headers)
def validate_json(
ctx: click.Context,
- param: typing.Union[click.Option, click.Parameter],
+ param: click.Option | click.Parameter,
value: typing.Any,
) -> typing.Any:
if value is None:
def validate_auth(
ctx: click.Context,
- param: typing.Union[click.Option, click.Parameter],
+ param: click.Option | click.Parameter,
value: typing.Any,
) -> typing.Any:
if value == (None, None):
def handle_help(
ctx: click.Context,
- param: typing.Union[click.Option, click.Parameter],
+ param: click.Option | click.Parameter,
value: typing.Any,
) -> None:
if not value or ctx.resilient_parsing:
def main(
url: str,
method: str,
- params: typing.List[typing.Tuple[str, str]],
+ params: list[tuple[str, str]],
content: str,
- data: typing.List[typing.Tuple[str, str]],
- files: typing.List[typing.Tuple[str, click.File]],
+ data: list[tuple[str, str]],
+ files: list[tuple[str, click.File]],
json: str,
- headers: typing.List[typing.Tuple[str, str]],
- cookies: typing.List[typing.Tuple[str, str]],
- auth: typing.Optional[typing.Tuple[str, str]],
+ headers: list[tuple[str, str]],
+ cookies: list[tuple[str, str]],
+ auth: tuple[str, str] | None,
proxy: str,
timeout: float,
follow_redirects: bool,
verify: bool,
http2: bool,
- download: typing.Optional[typing.BinaryIO],
+ download: typing.BinaryIO | None,
verbose: bool,
) -> None:
"""
+from __future__ import annotations
+
import datetime
import email.message
import json as jsonlib
def __init__(
self,
- headers: typing.Optional[HeaderTypes] = None,
- encoding: typing.Optional[str] = None,
+ headers: HeaderTypes | None = None,
+ encoding: str | None = None,
) -> None:
if headers is None:
self._list = [] # type: typing.List[typing.Tuple[bytes, bytes, bytes]]
self._encoding = value
@property
- def raw(self) -> typing.List[typing.Tuple[bytes, bytes]]:
+ def raw(self) -> list[tuple[bytes, bytes]]:
"""
Returns a list of the raw header items, as byte pairs.
"""
return {key.decode(self.encoding): None for _, key, value in self._list}.keys()
def values(self) -> typing.ValuesView[str]:
- values_dict: typing.Dict[str, str] = {}
+ values_dict: dict[str, str] = {}
for _, key, value in self._list:
str_key = key.decode(self.encoding)
str_value = value.decode(self.encoding)
Return `(key, value)` items of headers. Concatenate headers
into a single comma separated value when a key occurs multiple times.
"""
- values_dict: typing.Dict[str, str] = {}
+ values_dict: dict[str, str] = {}
for _, key, value in self._list:
str_key = key.decode(self.encoding)
str_value = value.decode(self.encoding)
values_dict[str_key] = str_value
return values_dict.items()
- def multi_items(self) -> typing.List[typing.Tuple[str, str]]:
+ def multi_items(self) -> list[tuple[str, str]]:
"""
Return a list of `(key, value)` pairs of headers. Allow multiple
occurrences of the same key without concatenating into a single
except KeyError:
return default
- def get_list(self, key: str, split_commas: bool = False) -> typing.List[str]:
+ def get_list(self, key: str, split_commas: bool = False) -> list[str]:
"""
Return a list of all header values for a given key.
If `split_commas=True` is passed, then any comma separated header
split_values.extend([item.strip() for item in value.split(",")])
return split_values
- def update(self, headers: typing.Optional[HeaderTypes] = None) -> None: # type: ignore
+ def update(self, headers: HeaderTypes | None = None) -> None: # type: ignore
headers = Headers(headers)
for key in headers.keys():
if key in self:
self.pop(key)
self._list.extend(headers._list)
- def copy(self) -> "Headers":
+ def copy(self) -> Headers:
return Headers(self, encoding=self.encoding)
def __getitem__(self, key: str) -> str:
class Request:
def __init__(
self,
- method: typing.Union[str, bytes],
- url: typing.Union["URL", str],
+ method: str | bytes,
+ url: URL | str,
*,
- params: typing.Optional[QueryParamTypes] = None,
- headers: typing.Optional[HeaderTypes] = None,
- cookies: typing.Optional[CookieTypes] = None,
- content: typing.Optional[RequestContent] = None,
- data: typing.Optional[RequestData] = None,
- files: typing.Optional[RequestFiles] = None,
- json: typing.Optional[typing.Any] = None,
- stream: typing.Union[SyncByteStream, AsyncByteStream, None] = None,
- extensions: typing.Optional[RequestExtensions] = None,
+ params: QueryParamTypes | None = None,
+ headers: HeaderTypes | None = None,
+ cookies: CookieTypes | None = None,
+ content: RequestContent | None = None,
+ data: RequestData | None = None,
+ files: RequestFiles | None = None,
+ json: typing.Any | None = None,
+ stream: SyncByteStream | AsyncByteStream | None = None,
+ extensions: RequestExtensions | None = None,
) -> None:
self.method = (
method.decode("ascii").upper()
Cookies(cookies).set_cookie_header(self)
if stream is None:
- content_type: typing.Optional[str] = self.headers.get("content-type")
+ content_type: str | None = self.headers.get("content-type")
headers, stream = encode_request(
content=content,
data=data,
# * Creating request instances on the *server-side* of the transport API.
self.stream = stream
- def _prepare(self, default_headers: typing.Dict[str, str]) -> None:
+ def _prepare(self, default_headers: dict[str, str]) -> None:
for key, value in default_headers.items():
# Ignore Transfer-Encoding if the Content-Length has been set explicitly.
if key.lower() == "transfer-encoding" and "Content-Length" in self.headers:
continue
self.headers.setdefault(key, value)
- auto_headers: typing.List[typing.Tuple[bytes, bytes]] = []
+ auto_headers: list[tuple[bytes, bytes]] = []
has_host = "Host" in self.headers
has_content_length = (
url = str(self.url)
return f"<{class_name}({self.method!r}, {url!r})>"
- def __getstate__(self) -> typing.Dict[str, typing.Any]:
+ def __getstate__(self) -> dict[str, typing.Any]:
return {
name: value
for name, value in self.__dict__.items()
if name not in ["extensions", "stream"]
}
- def __setstate__(self, state: typing.Dict[str, typing.Any]) -> None:
+ def __setstate__(self, state: dict[str, typing.Any]) -> None:
for name, value in state.items():
setattr(self, name, value)
self.extensions = {}
self,
status_code: int,
*,
- headers: typing.Optional[HeaderTypes] = None,
- content: typing.Optional[ResponseContent] = None,
- text: typing.Optional[str] = None,
- html: typing.Optional[str] = None,
+ headers: HeaderTypes | None = None,
+ content: ResponseContent | None = None,
+ text: str | None = None,
+ html: str | None = None,
json: typing.Any = None,
- stream: typing.Union[SyncByteStream, AsyncByteStream, None] = None,
- request: typing.Optional[Request] = None,
- extensions: typing.Optional[ResponseExtensions] = None,
- history: typing.Optional[typing.List["Response"]] = None,
- default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8",
+ stream: SyncByteStream | AsyncByteStream | None = None,
+ request: Request | None = None,
+ extensions: ResponseExtensions | None = None,
+ history: list[Response] | None = None,
+ default_encoding: str | typing.Callable[[bytes], str] = "utf-8",
) -> None:
self.status_code = status_code
self.headers = Headers(headers)
- self._request: typing.Optional[Request] = request
+ self._request: Request | None = request
# When follow_redirects=False and a redirect is received,
# the client will set `response.next_request`.
- self.next_request: typing.Optional[Request] = None
+ self.next_request: Request | None = None
self.extensions: ResponseExtensions = {} if extensions is None else extensions
self.history = [] if history is None else list(history)
self._num_bytes_downloaded = 0
- def _prepare(self, default_headers: typing.Dict[str, str]) -> None:
+ def _prepare(self, default_headers: dict[str, str]) -> None:
for key, value in default_headers.items():
# Ignore Transfer-Encoding if the Content-Length has been set explicitly.
if key.lower() == "transfer-encoding" and "content-length" in self.headers:
return self._text
@property
- def encoding(self) -> typing.Optional[str]:
+ def encoding(self) -> str | None:
"""
Return an encoding to use for decoding the byte content into text.
The priority for determining this is given by...
self._encoding = value
@property
- def charset_encoding(self) -> typing.Optional[str]:
+ def charset_encoding(self) -> str | None:
"""
Return the encoding, as specified by the Content-Type header.
"""
content, depending on the Content-Encoding used in the response.
"""
if not hasattr(self, "_decoder"):
- decoders: typing.List[ContentDecoder] = []
+ decoders: list[ContentDecoder] = []
values = self.headers.get_list("content-encoding", split_commas=True)
for value in values:
value = value.strip().lower()
and "Location" in self.headers
)
- def raise_for_status(self) -> "Response":
+ def raise_for_status(self) -> Response:
"""
Raise the `HTTPStatusError` if one occurred.
"""
return jsonlib.loads(self.content, **kwargs)
@property
- def cookies(self) -> "Cookies":
+ def cookies(self) -> Cookies:
if not hasattr(self, "_cookies"):
self._cookies = Cookies()
self._cookies.extract_cookies(self)
return self._cookies
@property
- def links(self) -> typing.Dict[typing.Optional[str], typing.Dict[str, str]]:
+ def links(self) -> dict[str | None, dict[str, str]]:
"""
Returns the parsed header links of the response, if any
"""
def __repr__(self) -> str:
return f"<Response [{self.status_code} {self.reason_phrase}]>"
- def __getstate__(self) -> typing.Dict[str, typing.Any]:
+ def __getstate__(self) -> dict[str, typing.Any]:
return {
name: value
for name, value in self.__dict__.items()
if name not in ["extensions", "stream", "is_closed", "_decoder"]
}
- def __setstate__(self, state: typing.Dict[str, typing.Any]) -> None:
+ def __setstate__(self, state: dict[str, typing.Any]) -> None:
for name, value in state.items():
setattr(self, name, value)
self.is_closed = True
self._content = b"".join(self.iter_bytes())
return self._content
- def iter_bytes(
- self, chunk_size: typing.Optional[int] = None
- ) -> typing.Iterator[bytes]:
+ def iter_bytes(self, chunk_size: int | None = None) -> typing.Iterator[bytes]:
"""
A byte-iterator over the decoded response content.
This allows us to handle gzip, deflate, and brotli encoded responses.
for chunk in chunker.flush():
yield chunk
- def iter_text(
- self, chunk_size: typing.Optional[int] = None
- ) -> typing.Iterator[str]:
+ def iter_text(self, chunk_size: int | None = None) -> typing.Iterator[str]:
"""
A str-iterator over the decoded response content
that handles both gzip, deflate, etc but also detects the content's
for line in decoder.flush():
yield line
- def iter_raw(
- self, chunk_size: typing.Optional[int] = None
- ) -> typing.Iterator[bytes]:
+ def iter_raw(self, chunk_size: int | None = None) -> typing.Iterator[bytes]:
"""
A byte-iterator over the raw response content.
"""
return self._content
async def aiter_bytes(
- self, chunk_size: typing.Optional[int] = None
+ self, chunk_size: int | None = None
) -> typing.AsyncIterator[bytes]:
"""
A byte-iterator over the decoded response content.
yield chunk
async def aiter_text(
- self, chunk_size: typing.Optional[int] = None
+ self, chunk_size: int | None = None
) -> typing.AsyncIterator[str]:
"""
A str-iterator over the decoded response content
yield line
async def aiter_raw(
- self, chunk_size: typing.Optional[int] = None
+ self, chunk_size: int | None = None
) -> typing.AsyncIterator[bytes]:
"""
A byte-iterator over the raw response content.
HTTP Cookies, as a mutable mapping.
"""
- def __init__(self, cookies: typing.Optional[CookieTypes] = None) -> None:
+ def __init__(self, cookies: CookieTypes | None = None) -> None:
if cookies is None or isinstance(cookies, dict):
self.jar = CookieJar()
if isinstance(cookies, dict):
def get( # type: ignore
self,
name: str,
- default: typing.Optional[str] = None,
- domain: typing.Optional[str] = None,
- path: typing.Optional[str] = None,
- ) -> typing.Optional[str]:
+ default: str | None = None,
+ domain: str | None = None,
+ path: str | None = None,
+ ) -> str | None:
"""
Get a cookie by name. May optionally include domain and path
in order to specify exactly which cookie to retrieve.
def delete(
self,
name: str,
- domain: typing.Optional[str] = None,
- path: typing.Optional[str] = None,
+ domain: str | None = None,
+ path: str | None = None,
) -> None:
"""
Delete a cookie by name. May optionally include domain and path
for cookie in remove:
self.jar.clear(cookie.domain, cookie.path, cookie.name)
- def clear(
- self, domain: typing.Optional[str] = None, path: typing.Optional[str] = None
- ) -> None:
+ def clear(self, domain: str | None = None, path: str | None = None) -> None:
"""
Delete all cookies. Optionally include a domain and path in
order to only delete a subset of all the cookies.
args.append(path)
self.jar.clear(*args)
- def update(self, cookies: typing.Optional[CookieTypes] = None) -> None: # type: ignore
+ def update(self, cookies: CookieTypes | None = None) -> None: # type: ignore
cookies = Cookies(cookies)
for cookie in cookies.jar:
self.jar.set_cookie(cookie)
+from __future__ import annotations
+
import io
import os
import typing
def get_multipart_boundary_from_content_type(
- content_type: typing.Optional[bytes],
-) -> typing.Optional[bytes]:
+ content_type: bytes | None,
+) -> bytes | None:
if not content_type or not content_type.startswith(b"multipart/form-data"):
return None
# parse boundary according to
A single form field item, within a multipart form field.
"""
- def __init__(
- self, name: str, value: typing.Union[str, bytes, int, float, None]
- ) -> None:
+ def __init__(self, name: str, value: str | bytes | int | float | None) -> None:
if not isinstance(name, str):
raise TypeError(
f"Invalid type for name. Expected str, got {type(name)}: {name!r}"
f" got {type(value)}: {value!r}"
)
self.name = name
- self.value: typing.Union[str, bytes] = (
+ self.value: str | bytes = (
value if isinstance(value, bytes) else primitive_value_to_str(value)
)
fileobj: FileContent
- headers: typing.Dict[str, str] = {}
- content_type: typing.Optional[str] = None
+ headers: dict[str, str] = {}
+ content_type: str | None = None
# This large tuple based API largely mirror's requests' API
# It would be good to think of better APIs for this that we could
self.file = fileobj
self.headers = headers
- def get_length(self) -> typing.Optional[int]:
+ def get_length(self) -> int | None:
headers = self.render_headers()
if isinstance(self.file, (str, bytes)):
self,
data: RequestData,
files: RequestFiles,
- boundary: typing.Optional[bytes] = None,
+ boundary: bytes | None = None,
) -> None:
if boundary is None:
boundary = os.urandom(16).hex().encode("ascii")
def _iter_fields(
self, data: RequestData, files: RequestFiles
- ) -> typing.Iterator[typing.Union[FileField, DataField]]:
+ ) -> typing.Iterator[FileField | DataField]:
for name, value in data.items():
if isinstance(value, (tuple, list)):
for item in value:
yield b"\r\n"
yield b"--%s--\r\n" % self.boundary
- def get_content_length(self) -> typing.Optional[int]:
+ def get_content_length(self) -> int | None:
"""
Return the length of the multipart encoded content, or `None` if
any of the files have a length that cannot be determined upfront.
# Content stream interface.
- def get_headers(self) -> typing.Dict[str, str]:
+ def get_headers(self) -> dict[str, str]:
content_length = self.get_content_length()
content_type = self.content_type
if content_length is None:
+from __future__ import annotations
+
from enum import IntEnum
* RFC 8470: Using Early Data in HTTP
"""
- def __new__(cls, value: int, phrase: str = "") -> "codes":
+ def __new__(cls, value: int, phrase: str = "") -> codes:
obj = int.__new__(cls, value)
obj._value_ = value
+from __future__ import annotations
+
import typing
import sniffio
]
-def create_event() -> "Event":
+def create_event() -> Event:
if sniffio.current_async_library() == "trio":
import trio
class ASGIResponseStream(AsyncByteStream):
- def __init__(self, body: typing.List[bytes]) -> None:
+ def __init__(self, body: list[bytes]) -> None:
self._body = body
async def __aiter__(self) -> typing.AsyncIterator[bytes]:
app: _ASGIApp,
raise_app_exceptions: bool = True,
root_path: str = "",
- client: typing.Tuple[str, int] = ("127.0.0.1", 123),
+ client: tuple[str, int] = ("127.0.0.1", 123),
) -> None:
self.app = app
self.raise_app_exceptions = raise_app_exceptions
# ASGI callables.
- async def receive() -> typing.Dict[str, typing.Any]:
+ async def receive() -> dict[str, typing.Any]:
nonlocal request_complete
if request_complete:
return {"type": "http.request", "body": b"", "more_body": False}
return {"type": "http.request", "body": body, "more_body": True}
- async def send(message: typing.Dict[str, typing.Any]) -> None:
+ async def send(message: dict[str, typing.Any]) -> None:
nonlocal status_code, response_headers, response_started
if message["type"] == "http.response.start":
+from __future__ import annotations
+
import typing
from types import TracebackType
def __exit__(
self,
- exc_type: typing.Optional[typing.Type[BaseException]] = None,
- exc_value: typing.Optional[BaseException] = None,
- traceback: typing.Optional[TracebackType] = None,
+ exc_type: type[BaseException] | None = None,
+ exc_value: BaseException | None = None,
+ traceback: TracebackType | None = None,
) -> None:
self.close()
async def __aexit__(
self,
- exc_type: typing.Optional[typing.Type[BaseException]] = None,
- exc_value: typing.Optional[BaseException] = None,
- traceback: typing.Optional[TracebackType] = None,
+ exc_type: type[BaseException] | None = None,
+ exc_value: BaseException | None = None,
+ traceback: TracebackType | None = None,
) -> None:
await self.aclose()
transport = httpx.HTTPTransport(uds="socket.uds")
client = httpx.Client(transport=transport)
"""
+from __future__ import annotations
+
import contextlib
import typing
from types import TracebackType
def __init__(
self,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
trust_env: bool = True,
- proxy: typing.Optional[ProxyTypes] = None,
- uds: typing.Optional[str] = None,
- local_address: typing.Optional[str] = None,
+ proxy: ProxyTypes | None = None,
+ uds: str | None = None,
+ local_address: str | None = None,
retries: int = 0,
- socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
+ socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
) -> None:
ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
def __exit__(
self,
- exc_type: typing.Optional[typing.Type[BaseException]] = None,
- exc_value: typing.Optional[BaseException] = None,
- traceback: typing.Optional[TracebackType] = None,
+ exc_type: type[BaseException] | None = None,
+ exc_value: BaseException | None = None,
+ traceback: TracebackType | None = None,
) -> None:
with map_httpcore_exceptions():
self._pool.__exit__(exc_type, exc_value, traceback)
def __init__(
self,
verify: VerifyTypes = True,
- cert: typing.Optional[CertTypes] = None,
+ cert: CertTypes | None = None,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
trust_env: bool = True,
- proxy: typing.Optional[ProxyTypes] = None,
- uds: typing.Optional[str] = None,
- local_address: typing.Optional[str] = None,
+ proxy: ProxyTypes | None = None,
+ uds: str | None = None,
+ local_address: str | None = None,
retries: int = 0,
- socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
+ socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
) -> None:
ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
async def __aexit__(
self,
- exc_type: typing.Optional[typing.Type[BaseException]] = None,
- exc_value: typing.Optional[BaseException] = None,
- traceback: typing.Optional[TracebackType] = None,
+ exc_type: type[BaseException] | None = None,
+ exc_value: BaseException | None = None,
+ traceback: TracebackType | None = None,
) -> None:
with map_httpcore_exceptions():
await self._pool.__aexit__(exc_type, exc_value, traceback)
+from __future__ import annotations
+
import typing
from .._models import Request, Response
class MockTransport(AsyncBaseTransport, BaseTransport):
- def __init__(self, handler: typing.Union[SyncHandler, AsyncHandler]) -> None:
+ def __init__(self, handler: SyncHandler | AsyncHandler) -> None:
self.handler = handler
def handle_request(
+from __future__ import annotations
+
import io
import itertools
import sys
def __init__(
self,
- app: "WSGIApplication",
+ app: WSGIApplication,
raise_app_exceptions: bool = True,
script_name: str = "",
remote_addr: str = "127.0.0.1",
- wsgi_errors: typing.Optional[typing.TextIO] = None,
+ wsgi_errors: typing.TextIO | None = None,
) -> None:
self.app = app
self.raise_app_exceptions = raise_app_exceptions
def start_response(
status: str,
- response_headers: typing.List[typing.Tuple[str, str]],
- exc_info: typing.Optional["OptExcInfo"] = None,
+ response_headers: list[tuple[str, str]],
+ exc_info: OptExcInfo | None = None,
) -> typing.Callable[[bytes], typing.Any]:
nonlocal seen_status, seen_response_headers, seen_exc_info
seen_status = status
validation, but this module provides a simpler alternative, with less indirection
required.
"""
+from __future__ import annotations
+
import ipaddress
import re
import typing
scheme: str
userinfo: str
host: str
- port: typing.Optional[int]
+ port: int | None
path: str
- query: typing.Optional[str]
- fragment: typing.Optional[str]
+ query: str | None
+ fragment: str | None
@property
def authority(self) -> str:
]
)
- def copy_with(self, **kwargs: typing.Optional[str]) -> "ParseResult":
+ def copy_with(self, **kwargs: str | None) -> ParseResult:
if not kwargs:
return self
)
-def urlparse(url: str = "", **kwargs: typing.Optional[str]) -> ParseResult:
+def urlparse(url: str = "", **kwargs: str | None) -> ParseResult:
# Initial basic checks on allowable URLs.
# ---------------------------------------
parsed_scheme: str = scheme.lower()
parsed_userinfo: str = quote(userinfo, safe=SUB_DELIMS + ":")
parsed_host: str = encode_host(host)
- parsed_port: typing.Optional[int] = normalize_port(port, scheme)
+ parsed_port: int | None = normalize_port(port, scheme)
has_scheme = parsed_scheme != ""
has_authority = (
# For 'path' we need to drop ? and # from the GEN_DELIMS set.
parsed_path: str = quote(path, safe=SUB_DELIMS + ":/[]@")
# For 'query' we need to drop '#' from the GEN_DELIMS set.
- parsed_query: typing.Optional[str] = (
+ parsed_query: str | None = (
None if query is None else quote(query, safe=SUB_DELIMS + ":/?[]@")
)
# For 'fragment' we can include all of the GEN_DELIMS set.
- parsed_fragment: typing.Optional[str] = (
+ parsed_fragment: str | None = (
None if fragment is None else quote(fragment, safe=SUB_DELIMS + ":/?#[]@")
)
raise InvalidURL(f"Invalid IDNA hostname: {host!r}")
-def normalize_port(
- port: typing.Optional[typing.Union[str, int]], scheme: str
-) -> typing.Optional[int]:
+def normalize_port(port: str | int | None, scheme: str) -> int | None:
# From https://tools.ietf.org/html/rfc3986#section-3.2.3
#
# "A scheme may define a default port. For example, the "http" scheme
"""
# https://datatracker.ietf.org/doc/html/rfc3986#section-5.2.4
components = path.split("/")
- output: typing.List[str] = []
+ output: list[str] = []
for component in components:
if component == ".":
pass
return "".join(parts)
-def urlencode(items: typing.List[typing.Tuple[str, str]]) -> str:
+def urlencode(items: list[tuple[str, str]]) -> str:
"""
We can use a much simpler version of the stdlib urlencode here because
we don't need to handle a bunch of different typing cases, such as bytes vs str.
+from __future__ import annotations
+
import typing
from urllib.parse import parse_qs, unquote
themselves.
"""
- def __init__(
- self, url: typing.Union["URL", str] = "", **kwargs: typing.Any
- ) -> None:
+ def __init__(self, url: URL | str = "", **kwargs: typing.Any) -> None:
if kwargs:
allowed = {
"scheme": str,
return self._uri_reference.host.encode("ascii")
@property
- def port(self) -> typing.Optional[int]:
+ def port(self) -> int | None:
"""
The URL port as an integer.
return query.encode("ascii")
@property
- def params(self) -> "QueryParams":
+ def params(self) -> QueryParams:
"""
The URL query parameters, neatly parsed and packaged into an immutable
multidict representation.
"""
return not self.is_absolute_url
- def copy_with(self, **kwargs: typing.Any) -> "URL":
+ def copy_with(self, **kwargs: typing.Any) -> URL:
"""
Copy this URL, returning a new URL with some components altered.
Accepts the same set of parameters as the components that are made
"""
return URL(self, **kwargs)
- def copy_set_param(self, key: str, value: typing.Any = None) -> "URL":
+ def copy_set_param(self, key: str, value: typing.Any = None) -> URL:
return self.copy_with(params=self.params.set(key, value))
- def copy_add_param(self, key: str, value: typing.Any = None) -> "URL":
+ def copy_add_param(self, key: str, value: typing.Any = None) -> URL:
return self.copy_with(params=self.params.add(key, value))
- def copy_remove_param(self, key: str) -> "URL":
+ def copy_remove_param(self, key: str) -> URL:
return self.copy_with(params=self.params.remove(key))
- def copy_merge_params(self, params: QueryParamTypes) -> "URL":
+ def copy_merge_params(self, params: QueryParamTypes) -> URL:
return self.copy_with(params=self.params.merge(params))
- def join(self, url: URLTypes) -> "URL":
+ def join(self, url: URLTypes) -> URL:
"""
Return an absolute URL, using this URL as the base.
URL query parameters, as a multi-dict.
"""
- def __init__(
- self, *args: typing.Optional[QueryParamTypes], **kwargs: typing.Any
- ) -> None:
+ def __init__(self, *args: QueryParamTypes | None, **kwargs: typing.Any) -> None:
assert len(args) < 2, "Too many arguments."
assert not (args and kwargs), "Cannot mix named and unnamed arguments."
elif isinstance(value, QueryParams):
self._dict = {k: list(v) for k, v in value._dict.items()}
else:
- dict_value: typing.Dict[typing.Any, typing.List[typing.Any]] = {}
+ dict_value: dict[typing.Any, list[typing.Any]] = {}
if isinstance(value, (list, tuple)):
# Convert list inputs like:
# [("a", "123"), ("a", "456"), ("b", "789")]
"""
return {k: v[0] for k, v in self._dict.items()}.items()
- def multi_items(self) -> typing.List[typing.Tuple[str, str]]:
+ def multi_items(self) -> list[tuple[str, str]]:
"""
Return all items in the query params. Allow duplicate keys to occur.
q = httpx.QueryParams("a=123&a=456&b=789")
assert list(q.multi_items()) == [("a", "123"), ("a", "456"), ("b", "789")]
"""
- multi_items: typing.List[typing.Tuple[str, str]] = []
+ multi_items: list[tuple[str, str]] = []
for k, v in self._dict.items():
multi_items.extend([(k, i) for i in v])
return multi_items
return self._dict[str(key)][0]
return default
- def get_list(self, key: str) -> typing.List[str]:
+ def get_list(self, key: str) -> list[str]:
"""
Get all values from the query param for a given key.
"""
return list(self._dict.get(str(key), []))
- def set(self, key: str, value: typing.Any = None) -> "QueryParams":
+ def set(self, key: str, value: typing.Any = None) -> QueryParams:
"""
Return a new QueryParams instance, setting the value of a key.
q._dict[str(key)] = [primitive_value_to_str(value)]
return q
- def add(self, key: str, value: typing.Any = None) -> "QueryParams":
+ def add(self, key: str, value: typing.Any = None) -> QueryParams:
"""
Return a new QueryParams instance, setting or appending the value of a key.
q._dict[str(key)] = q.get_list(key) + [primitive_value_to_str(value)]
return q
- def remove(self, key: str) -> "QueryParams":
+ def remove(self, key: str) -> QueryParams:
"""
Return a new QueryParams instance, removing the value of a key.
q._dict.pop(str(key), None)
return q
- def merge(self, params: typing.Optional[QueryParamTypes] = None) -> "QueryParams":
+ def merge(self, params: QueryParamTypes | None = None) -> QueryParams:
"""
Return a new QueryParams instance, updated with.
query_string = str(self)
return f"{class_name}({query_string!r})"
- def update(self, params: typing.Optional[QueryParamTypes] = None) -> None:
+ def update(self, params: QueryParamTypes | None = None) -> None:
raise RuntimeError(
"QueryParams are immutable since 0.18.0. "
"Use `q = q.merge(...)` to create an updated copy."
+from __future__ import annotations
+
import codecs
import email.message
import ipaddress
def normalize_header_key(
- value: typing.Union[str, bytes],
+ value: str | bytes,
lower: bool,
- encoding: typing.Optional[str] = None,
+ encoding: str | None = None,
) -> bytes:
"""
Coerce str/bytes into a strictly byte-wise HTTP header key.
return bytes_value.lower() if lower else bytes_value
-def normalize_header_value(
- value: typing.Union[str, bytes], encoding: typing.Optional[str] = None
-) -> bytes:
+def normalize_header_value(value: str | bytes, encoding: str | None = None) -> bytes:
"""
Coerce str/bytes into a strictly byte-wise HTTP header value.
"""
return value.encode(encoding or "ascii")
-def primitive_value_to_str(value: "PrimitiveData") -> str:
+def primitive_value_to_str(value: PrimitiveData) -> str:
"""
Coerce a primitive data type into a string value.
return f'{name}="{value}"'.encode()
-def get_ca_bundle_from_env() -> typing.Optional[str]:
+def get_ca_bundle_from_env() -> str | None:
if "SSL_CERT_FILE" in os.environ:
ssl_file = Path(os.environ["SSL_CERT_FILE"])
if ssl_file.is_file():
return None
-def parse_header_links(value: str) -> typing.List[typing.Dict[str, str]]:
+def parse_header_links(value: str) -> list[dict[str, str]]:
"""
Returns a list of parsed link headers, for more info see:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
:param value: HTTP Link entity-header field
:return: list of parsed link headers
"""
- links: typing.List[typing.Dict[str, str]] = []
+ links: list[dict[str, str]] = []
replace_chars = " '\""
value = value.strip(replace_chars)
if not value:
return links
-def parse_content_type_charset(content_type: str) -> typing.Optional[str]:
+def parse_content_type_charset(content_type: str) -> str | None:
# We used to use `cgi.parse_header()` here, but `cgi` became a dead battery.
# See: https://peps.python.org/pep-0594/#cgi
msg = email.message.Message()
def obfuscate_sensitive_headers(
- items: typing.Iterable[typing.Tuple[typing.AnyStr, typing.AnyStr]],
-) -> typing.Iterator[typing.Tuple[typing.AnyStr, typing.AnyStr]]:
+ items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]],
+) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]:
for k, v in items:
if to_str(k.lower()) in SENSITIVE_HEADERS:
v = to_bytes_or_str("[secure]", match_type_of=v)
yield k, v
-def port_or_default(url: "URL") -> typing.Optional[int]:
+def port_or_default(url: URL) -> int | None:
if url.port is not None:
return url.port
return {"http": 80, "https": 443}.get(url.scheme)
-def same_origin(url: "URL", other: "URL") -> bool:
+def same_origin(url: URL, other: URL) -> bool:
"""
Return 'True' if the given URLs share the same origin.
"""
)
-def is_https_redirect(url: "URL", location: "URL") -> bool:
+def is_https_redirect(url: URL, location: URL) -> bool:
"""
Return 'True' if 'location' is a HTTPS upgrade of 'url'
"""
)
-def get_environment_proxies() -> typing.Dict[str, typing.Optional[str]]:
+def get_environment_proxies() -> dict[str, str | None]:
"""Gets proxy information from the environment"""
# urllib.request.getproxies() falls back on System
# We don't want to propagate non-HTTP proxies into
# our configuration such as 'TRAVIS_APT_PROXY'.
proxy_info = getproxies()
- mounts: typing.Dict[str, typing.Optional[str]] = {}
+ mounts: dict[str, str | None] = {}
for scheme in ("http", "https", "all"):
if proxy_info.get(scheme):
return mounts
-def to_bytes(value: typing.Union[str, bytes], encoding: str = "utf-8") -> bytes:
+def to_bytes(value: str | bytes, encoding: str = "utf-8") -> bytes:
return value.encode(encoding) if isinstance(value, str) else value
-def to_str(value: typing.Union[str, bytes], encoding: str = "utf-8") -> str:
+def to_str(value: str | bytes, encoding: str = "utf-8") -> str:
return value if isinstance(value, str) else value.decode(encoding)
return value[1:-1] if value[0] == value[-1] == '"' else value
-def guess_content_type(filename: typing.Optional[str]) -> typing.Optional[str]:
+def guess_content_type(filename: str | None) -> str | None:
if filename:
return mimetypes.guess_type(filename)[0] or "application/octet-stream"
return None
-def peek_filelike_length(stream: typing.Any) -> typing.Optional[int]:
+def peek_filelike_length(stream: typing.Any) -> int | None:
"""
Given a file-like stream object, return its length in number of bytes
without reading it into memory.
self.host = "" if url.host == "*" else url.host
self.port = url.port
if not url.host or url.host == "*":
- self.host_regex: typing.Optional[typing.Pattern[str]] = None
+ self.host_regex: typing.Pattern[str] | None = None
elif url.host.startswith("*."):
# *.example.com should match "www.example.com", but not "example.com"
domain = re.escape(url.host[2:])
domain = re.escape(url.host)
self.host_regex = re.compile(f"^{domain}$")
- def matches(self, other: "URL") -> bool:
+ def matches(self, other: URL) -> bool:
if self.scheme and self.scheme != other.scheme:
return False
if (
return True
@property
- def priority(self) -> typing.Tuple[int, int, int]:
+ def priority(self) -> tuple[int, int, int]:
"""
The priority allows URLPattern instances to be sortable, so that
we can match from most specific to least specific.
def __hash__(self) -> int:
return hash(self.pattern)
- def __lt__(self, other: "URLPattern") -> bool:
+ def __lt__(self, other: URLPattern) -> bool:
return self.priority < other.priority
def __eq__(self, other: typing.Any) -> bool:
+from __future__ import annotations
+
import typing
from datetime import timedelta
async def test_context_managed_transport():
class Transport(httpx.AsyncBaseTransport):
def __init__(self) -> None:
- self.events: typing.List[str] = []
+ self.events: list[str] = []
async def aclose(self):
# The base implementation of httpx.AsyncBaseTransport just
class Transport(httpx.AsyncBaseTransport):
def __init__(self, name: str) -> None:
self.name: str = name
- self.events: typing.List[str] = []
+ self.events: list[str] = []
async def aclose(self):
# The base implementation of httpx.AsyncBaseTransport just
+from __future__ import annotations
+
import typing
from datetime import timedelta
def test_context_managed_transport():
class Transport(httpx.BaseTransport):
def __init__(self) -> None:
- self.events: typing.List[str] = []
+ self.events: list[str] = []
def close(self):
# The base implementation of httpx.BaseTransport just
class Transport(httpx.BaseTransport):
def __init__(self, name: str) -> None:
self.name: str = name
- self.events: typing.List[str] = []
+ self.events: list[str] = []
def close(self):
# The base implementation of httpx.BaseTransport just
+from __future__ import annotations
+
import typing
import zlib
[((b"Hello,", b" world!"), ["Hello,", " world!"])],
)
def test_streaming_text_decoder(
- data: typing.Iterable[bytes], expected: typing.List[str]
+ data: typing.Iterable[bytes], expected: list[str]
) -> None:
response = httpx.Response(200, content=iter(data))
assert list(response.iter_text()) == expected
+from __future__ import annotations
+
import typing
import httpcore
pytest.fail(f"Unmapped httpcore exceptions: {unmapped_exceptions}")
-def test_httpcore_exception_mapping(server: "TestServer") -> None:
+def test_httpcore_exception_mapping(server: TestServer) -> None:
"""
HTTPCore exception mapping works as expected.
"""
+from __future__ import annotations
+
import io
import tempfile
import typing
@pytest.mark.parametrize("file_content_type", [None, "text/plain"])
-def test_multipart_file_tuple_headers(file_content_type: typing.Optional[str]) -> None:
+def test_multipart_file_tuple_headers(file_content_type: str | None) -> None:
file_name = "test.txt"
file_content = io.BytesIO(b"<file content>")
file_headers = {"Expires": "0"}
+from __future__ import annotations
+
import sys
import typing
import wsgiref.validate
from _typeshed.wsgi import StartResponse, WSGIApplication, WSGIEnvironment
-def application_factory(output: typing.Iterable[bytes]) -> "WSGIApplication":
+def application_factory(output: typing.Iterable[bytes]) -> WSGIApplication:
def application(environ, start_response):
status = "200 OK"
def echo_body(
- environ: "WSGIEnvironment", start_response: "StartResponse"
+ environ: WSGIEnvironment, start_response: StartResponse
) -> typing.Iterable[bytes]:
status = "200 OK"
output = environ["wsgi.input"].read()
def echo_body_with_response_stream(
- environ: "WSGIEnvironment", start_response: "StartResponse"
+ environ: WSGIEnvironment, start_response: StartResponse
) -> typing.Iterable[bytes]:
status = "200 OK"
def raise_exc(
- environ: "WSGIEnvironment",
- start_response: "StartResponse",
- exc: typing.Type[Exception] = ValueError,
+ environ: WSGIEnvironment,
+ start_response: StartResponse,
+ exc: type[Exception] = ValueError,
) -> typing.Iterable[bytes]:
status = "500 Server Error"
output = b"Nope!"
SERVER_PORT is populated correctly from the requested URL.
"""
hello_world_app = application_factory([b"Hello, World!"])
- server_port: typing.Optional[str] = None
+ server_port: str | None = None
def app(environ, start_response):
nonlocal server_port