import contextlib
-import http
import inspect
import io
import json
import math
import queue
import sys
-import types
import typing
+import warnings
from concurrent.futures import Future
-from urllib.parse import unquote, urljoin, urlsplit
+from types import GeneratorType
+from urllib.parse import unquote, urljoin
-import anyio.abc
-import requests
+import anyio
+import httpx
from anyio.streams.stapled import StapledObjectStream
from starlette._utils import is_async_callable
-from starlette.types import Message, Receive, Scope, Send
+from starlette.types import ASGIApp, Message, Receive, Scope, Send
from starlette.websockets import WebSocketDisconnect
if sys.version_info >= (3, 8): # pragma: no cover
else: # pragma: no cover
from typing_extensions import TypedDict
-
_PortalFactoryType = typing.Callable[
[], typing.ContextManager[anyio.abc.BlockingPortal]
]
-
-# Annotations for `Session.request()`
-Cookies = typing.Union[
- typing.MutableMapping[str, str], requests.cookies.RequestsCookieJar
-]
-Params = typing.Union[bytes, typing.MutableMapping[str, str]]
-DataType = typing.Union[bytes, typing.MutableMapping[str, str], typing.IO]
-TimeOut = typing.Union[float, typing.Tuple[float, float]]
-FileType = typing.MutableMapping[str, typing.IO]
-AuthType = typing.Union[
- typing.Tuple[str, str],
- requests.auth.AuthBase,
- typing.Callable[[requests.PreparedRequest], requests.PreparedRequest],
-]
-
-
ASGIInstance = typing.Callable[[Receive, Send], typing.Awaitable[None]]
ASGI2App = typing.Callable[[Scope], ASGIInstance]
ASGI3App = typing.Callable[[Scope, Receive, Send], typing.Awaitable[None]]
-class _HeaderDict(requests.packages.urllib3._collections.HTTPHeaderDict):
- def get_all(self, key: str, default: str) -> str:
- return self.getheaders(key)
-
-
-class _MockOriginalResponse:
- """
- We have to jump through some hoops to present the response as if
- it was made using urllib3.
- """
-
- def __init__(self, headers: typing.List[typing.Tuple[bytes, bytes]]) -> None:
- self.msg = _HeaderDict(headers)
- self.closed = False
-
- def isclosed(self) -> bool:
- return self.closed
-
-
-class _Upgrade(Exception):
- def __init__(self, session: "WebSocketTestSession") -> None:
- self.session = session
-
-
-def _get_reason_phrase(status_code: int) -> str:
- try:
- return http.HTTPStatus(status_code).phrase
- except ValueError:
- return ""
-
-
def _is_asgi3(app: typing.Union[ASGI2App, ASGI3App]) -> bool:
if inspect.isclass(app):
return hasattr(app, "__await__")
backend_options: typing.Dict[str, typing.Any]
-class _ASGIAdapter(requests.adapters.HTTPAdapter):
+class _Upgrade(Exception):
+ def __init__(self, session: "WebSocketTestSession") -> None:
+ self.session = session
+
+
+class WebSocketTestSession:
+ def __init__(
+ self,
+ app: ASGI3App,
+ scope: Scope,
+ portal_factory: _PortalFactoryType,
+ ) -> None:
+ self.app = app
+ self.scope = scope
+ self.accepted_subprotocol = None
+ self.portal_factory = portal_factory
+ self._receive_queue: "queue.Queue[typing.Any]" = queue.Queue()
+ self._send_queue: "queue.Queue[typing.Any]" = queue.Queue()
+ self.extra_headers = None
+
+ def __enter__(self) -> "WebSocketTestSession":
+ self.exit_stack = contextlib.ExitStack()
+ self.portal = self.exit_stack.enter_context(self.portal_factory())
+
+ try:
+ _: "Future[None]" = self.portal.start_task_soon(self._run)
+ self.send({"type": "websocket.connect"})
+ message = self.receive()
+ self._raise_on_close(message)
+ except Exception:
+ self.exit_stack.close()
+ raise
+ self.accepted_subprotocol = message.get("subprotocol", None)
+ self.extra_headers = message.get("headers", None)
+ return self
+
+ def __exit__(self, *args: typing.Any) -> None:
+ try:
+ self.close(1000)
+ finally:
+ self.exit_stack.close()
+ while not self._send_queue.empty():
+ message = self._send_queue.get()
+ if isinstance(message, BaseException):
+ raise message
+
+ async def _run(self) -> None:
+ """
+ The sub-thread in which the websocket session runs.
+ """
+ scope = self.scope
+ receive = self._asgi_receive
+ send = self._asgi_send
+ try:
+ await self.app(scope, receive, send)
+ except BaseException as exc:
+ self._send_queue.put(exc)
+ raise
+
+ async def _asgi_receive(self) -> Message:
+ while self._receive_queue.empty():
+ await anyio.sleep(0)
+ return self._receive_queue.get()
+
+ async def _asgi_send(self, message: Message) -> None:
+ self._send_queue.put(message)
+
+ def _raise_on_close(self, message: Message) -> None:
+ if message["type"] == "websocket.close":
+ raise WebSocketDisconnect(
+ message.get("code", 1000), message.get("reason", "")
+ )
+
+ def send(self, message: Message) -> None:
+ self._receive_queue.put(message)
+
+ def send_text(self, data: str) -> None:
+ self.send({"type": "websocket.receive", "text": data})
+
+ def send_bytes(self, data: bytes) -> None:
+ self.send({"type": "websocket.receive", "bytes": data})
+
+ def send_json(self, data: typing.Any, mode: str = "text") -> None:
+ assert mode in ["text", "binary"]
+ text = json.dumps(data)
+ if mode == "text":
+ self.send({"type": "websocket.receive", "text": text})
+ else:
+ self.send({"type": "websocket.receive", "bytes": text.encode("utf-8")})
+
+ def close(self, code: int = 1000) -> None:
+ self.send({"type": "websocket.disconnect", "code": code})
+
+ def receive(self) -> Message:
+ message = self._send_queue.get()
+ if isinstance(message, BaseException):
+ raise message
+ return message
+
+ def receive_text(self) -> str:
+ message = self.receive()
+ self._raise_on_close(message)
+ return message["text"]
+
+ def receive_bytes(self) -> bytes:
+ message = self.receive()
+ self._raise_on_close(message)
+ return message["bytes"]
+
+ def receive_json(self, mode: str = "text") -> typing.Any:
+ assert mode in ["text", "binary"]
+ message = self.receive()
+ self._raise_on_close(message)
+ if mode == "text":
+ text = message["text"]
+ else:
+ text = message["bytes"].decode("utf-8")
+ return json.loads(text)
+
+
+class _TestClientTransport(httpx.BaseTransport):
def __init__(
self,
app: ASGI3App,
self.root_path = root_path
self.portal_factory = portal_factory
- def send(
- self, request: requests.PreparedRequest, *args: typing.Any, **kwargs: typing.Any
- ) -> requests.Response:
- scheme, netloc, path, query, fragment = (
- str(item) for item in urlsplit(request.url)
- )
+ def handle_request(self, request: httpx.Request) -> httpx.Response:
+ scheme = request.url.scheme
+ netloc = unquote(request.url.netloc.decode(encoding="ascii"))
+ path = request.url.path
+ raw_path = request.url.raw_path
+ query = unquote(request.url.query.decode(encoding="ascii"))
default_port = {"http": 80, "ws": 80, "https": 443, "wss": 443}[scheme]
# Include the 'host' header.
if "host" in request.headers:
headers: typing.List[typing.Tuple[bytes, bytes]] = []
- elif port == default_port:
+ elif port == default_port: # pragma: no cover
headers = [(b"host", host.encode())]
- else:
+ else: # pragma: no cover
headers = [(b"host", (f"{host}:{port}").encode())]
# Include other request headers.
scope = {
"type": "websocket",
"path": unquote(path),
- "raw_path": path.encode(),
+ "raw_path": raw_path,
"root_path": self.root_path,
"scheme": scheme,
"query_string": query.encode(),
"http_version": "1.1",
"method": request.method,
"path": unquote(path),
- "raw_path": path.encode(),
+ "raw_path": raw_path,
"root_path": self.root_path,
"scheme": scheme,
"query_string": query.encode(),
request_complete = False
response_started = False
response_complete: anyio.Event
- raw_kwargs: typing.Dict[str, typing.Any] = {"body": io.BytesIO()}
+ raw_kwargs: typing.Dict[str, typing.Any] = {"stream": io.BytesIO()}
template = None
context = None
await response_complete.wait()
return {"type": "http.disconnect"}
- body = request.body
+ body = request.read()
if isinstance(body, str):
- body_bytes: bytes = body.encode("utf-8")
+ body_bytes: bytes = body.encode("utf-8") # pragma: no cover
elif body is None:
- body_bytes = b""
- elif isinstance(body, types.GeneratorType):
- try:
+ body_bytes = b"" # pragma: no cover
+ elif isinstance(body, GeneratorType):
+ try: # pragma: no cover
chunk = body.send(None)
if isinstance(chunk, str):
chunk = chunk.encode("utf-8")
return {"type": "http.request", "body": chunk, "more_body": True}
- except StopIteration:
+ except StopIteration: # pragma: no cover
request_complete = True
return {"type": "http.request", "body": b""}
else:
assert (
not response_started
), 'Received multiple "http.response.start" messages.'
- raw_kwargs["version"] = 11
- raw_kwargs["status"] = message["status"]
- raw_kwargs["reason"] = _get_reason_phrase(message["status"])
+ raw_kwargs["status_code"] = message["status"]
raw_kwargs["headers"] = [
(key.decode(), value.decode())
for key, value in message.get("headers", [])
]
- raw_kwargs["preload_content"] = False
- raw_kwargs["original_response"] = _MockOriginalResponse(
- raw_kwargs["headers"]
- )
response_started = True
elif message["type"] == "http.response.body":
assert (
body = message.get("body", b"")
more_body = message.get("more_body", False)
if request.method != "HEAD":
- raw_kwargs["body"].write(body)
+ raw_kwargs["stream"].write(body)
if not more_body:
- raw_kwargs["body"].seek(0)
+ raw_kwargs["stream"].seek(0)
response_complete.set()
elif message["type"] == "http.response.template":
template = message["template"]
assert response_started, "TestClient did not receive any response."
elif not response_started:
raw_kwargs = {
- "version": 11,
- "status": 500,
- "reason": "Internal Server Error",
+ "status_code": 500,
"headers": [],
- "preload_content": False,
- "original_response": _MockOriginalResponse([]),
- "body": io.BytesIO(),
+ "stream": io.BytesIO(),
}
- raw = requests.packages.urllib3.HTTPResponse(**raw_kwargs)
- response = self.build_response(request, raw)
+ raw_kwargs["stream"] = httpx.ByteStream(raw_kwargs["stream"].read())
+
+ response = httpx.Response(**raw_kwargs, request=request)
if template is not None:
- response.template = template
- response.context = context
+ response.template = template # type: ignore[attr-defined]
+ response.context = context # type: ignore[attr-defined]
return response
-class WebSocketTestSession:
- def __init__(
- self,
- app: ASGI3App,
- scope: Scope,
- portal_factory: _PortalFactoryType,
- ) -> None:
- self.app = app
- self.scope = scope
- self.accepted_subprotocol = None
- self.extra_headers = None
- self.portal_factory = portal_factory
- self._receive_queue: "queue.Queue[typing.Any]" = queue.Queue()
- self._send_queue: "queue.Queue[typing.Any]" = queue.Queue()
-
- def __enter__(self) -> "WebSocketTestSession":
- self.exit_stack = contextlib.ExitStack()
- self.portal = self.exit_stack.enter_context(self.portal_factory())
-
- try:
- _: "Future[None]" = self.portal.start_task_soon(self._run)
- self.send({"type": "websocket.connect"})
- message = self.receive()
- self._raise_on_close(message)
- except Exception:
- self.exit_stack.close()
- raise
- self.accepted_subprotocol = message.get("subprotocol", None)
- self.extra_headers = message.get("headers", None)
- return self
-
- def __exit__(self, *args: typing.Any) -> None:
- try:
- self.close(1000)
- finally:
- self.exit_stack.close()
- while not self._send_queue.empty():
- message = self._send_queue.get()
- if isinstance(message, BaseException):
- raise message
-
- async def _run(self) -> None:
- """
- The sub-thread in which the websocket session runs.
- """
- scope = self.scope
- receive = self._asgi_receive
- send = self._asgi_send
- try:
- await self.app(scope, receive, send)
- except BaseException as exc:
- self._send_queue.put(exc)
- raise
-
- async def _asgi_receive(self) -> Message:
- while self._receive_queue.empty():
- await anyio.sleep(0)
- return self._receive_queue.get()
-
- async def _asgi_send(self, message: Message) -> None:
- self._send_queue.put(message)
-
- def _raise_on_close(self, message: Message) -> None:
- if message["type"] == "websocket.close":
- raise WebSocketDisconnect(
- message.get("code", 1000), message.get("reason", "")
- )
-
- def send(self, message: Message) -> None:
- self._receive_queue.put(message)
-
- def send_text(self, data: str) -> None:
- self.send({"type": "websocket.receive", "text": data})
-
- def send_bytes(self, data: bytes) -> None:
- self.send({"type": "websocket.receive", "bytes": data})
-
- def send_json(self, data: typing.Any, mode: str = "text") -> None:
- assert mode in ["text", "binary"]
- text = json.dumps(data)
- if mode == "text":
- self.send({"type": "websocket.receive", "text": text})
- else:
- self.send({"type": "websocket.receive", "bytes": text.encode("utf-8")})
-
- def close(self, code: int = 1000) -> None:
- self.send({"type": "websocket.disconnect", "code": code})
-
- def receive(self) -> Message:
- message = self._send_queue.get()
- if isinstance(message, BaseException):
- raise message
- return message
-
- def receive_text(self) -> str:
- message = self.receive()
- self._raise_on_close(message)
- return message["text"]
-
- def receive_bytes(self) -> bytes:
- message = self.receive()
- self._raise_on_close(message)
- return message["bytes"]
-
- def receive_json(self, mode: str = "text") -> typing.Any:
- assert mode in ["text", "binary"]
- message = self.receive()
- self._raise_on_close(message)
- if mode == "text":
- text = message["text"]
- else:
- text = message["bytes"].decode("utf-8")
- return json.loads(text)
-
-
-class TestClient(requests.Session):
- __test__ = False # For pytest to not discover this up.
+class TestClient(httpx.Client):
+ __test__ = False
task: "Future[None]"
portal: typing.Optional[anyio.abc.BlockingPortal] = None
def __init__(
self,
- app: typing.Union[ASGI2App, ASGI3App],
+ app: ASGIApp,
base_url: str = "http://testserver",
raise_server_exceptions: bool = True,
root_path: str = "",
backend: str = "asyncio",
backend_options: typing.Optional[typing.Dict[str, typing.Any]] = None,
+ cookies: httpx._client.CookieTypes = None,
) -> None:
- super().__init__()
self.async_backend = _AsyncBackend(
backend=backend, backend_options=backend_options or {}
)
app = typing.cast(ASGI3App, app)
asgi_app = app
else:
- app = typing.cast(ASGI2App, app)
- asgi_app = _WrapASGI2(app) # type: ignore
- adapter = _ASGIAdapter(
- asgi_app,
+ app = typing.cast(ASGI2App, app) # type: ignore[assignment]
+ asgi_app = _WrapASGI2(app) # type: ignore[arg-type]
+ self.app = asgi_app
+ transport = _TestClientTransport(
+ self.app,
portal_factory=self._portal_factory,
raise_server_exceptions=raise_server_exceptions,
root_path=root_path,
)
- self.mount("http://", adapter)
- self.mount("https://", adapter)
- self.mount("ws://", adapter)
- self.mount("wss://", adapter)
- self.headers.update({"user-agent": "testclient"})
- self.app = asgi_app
- self.base_url = base_url
+ super().__init__(
+ app=self.app,
+ base_url=base_url,
+ headers={"user-agent": "testclient"},
+ transport=transport,
+ follow_redirects=True,
+ cookies=cookies,
+ )
@contextlib.contextmanager
- def _portal_factory(
- self,
- ) -> typing.Generator[anyio.abc.BlockingPortal, None, None]:
+ def _portal_factory(self) -> typing.Generator[anyio.abc.BlockingPortal, None, None]:
if self.portal is not None:
yield self.portal
else:
with anyio.start_blocking_portal(**self.async_backend) as portal:
yield portal
- def request( # type: ignore
+ def _choose_redirect_arg(
+ self,
+ follow_redirects: typing.Optional[bool],
+ allow_redirects: typing.Optional[bool],
+ ) -> typing.Union[bool, httpx._client.UseClientDefault]:
+ redirect: typing.Union[
+ bool, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT
+ if allow_redirects is not None:
+ message = (
+ "The `allow_redirects` argument is deprecated. "
+ "Use `follow_redirects` instead."
+ )
+ warnings.warn(message, DeprecationWarning)
+ redirect = allow_redirects
+ if follow_redirects is not None:
+ redirect = follow_redirects
+ elif allow_redirects is not None and follow_redirects is not None:
+ raise RuntimeError( # pragma: no cover
+ "Cannot use both `allow_redirects` and `follow_redirects`."
+ )
+ return redirect
+
+ def request( # type: ignore[override]
self,
method: str,
- url: str,
- params: Params = None,
- data: DataType = None,
- headers: typing.MutableMapping[str, str] = None,
- cookies: Cookies = None,
- files: FileType = None,
- auth: AuthType = None,
- timeout: TimeOut = None,
- allow_redirects: bool = None,
- proxies: typing.MutableMapping[str, str] = None,
- hooks: typing.Any = None,
- stream: bool = None,
- verify: typing.Union[bool, str] = None,
- cert: typing.Union[str, typing.Tuple[str, str]] = None,
+ url: httpx._types.URLTypes,
+ *,
+ content: httpx._types.RequestContent = None,
+ data: httpx._types.RequestData = None,
+ files: httpx._types.RequestFiles = None,
json: typing.Any = None,
- ) -> requests.Response:
- url = urljoin(self.base_url, url)
+ params: httpx._types.QueryParamTypes = None,
+ headers: httpx._types.HeaderTypes = None,
+ cookies: httpx._types.CookieTypes = None,
+ auth: typing.Union[
+ httpx._types.AuthTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ follow_redirects: bool = None,
+ allow_redirects: bool = None,
+ timeout: typing.Union[
+ httpx._client.TimeoutTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ extensions: dict = None,
+ ) -> httpx.Response:
+ url = self.base_url.join(url)
+ redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
return super().request(
method,
+ url,
+ content=content,
+ data=data,
+ files=files,
+ json=json,
+ params=params,
+ headers=headers,
+ cookies=cookies,
+ auth=auth,
+ follow_redirects=redirect,
+ timeout=timeout,
+ extensions=extensions,
+ )
+
+ def get( # type: ignore[override]
+ self,
+ url: httpx._types.URLTypes,
+ *,
+ params: httpx._types.QueryParamTypes = None,
+ headers: httpx._types.HeaderTypes = None,
+ cookies: httpx._types.CookieTypes = None,
+ auth: typing.Union[
+ httpx._types.AuthTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ follow_redirects: bool = None,
+ allow_redirects: bool = None,
+ timeout: typing.Union[
+ httpx._client.TimeoutTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ extensions: dict = None,
+ ) -> httpx.Response:
+ redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
+ return super().get(
+ url,
+ params=params,
+ headers=headers,
+ cookies=cookies,
+ auth=auth,
+ follow_redirects=redirect,
+ timeout=timeout,
+ extensions=extensions,
+ )
+
+ def options( # type: ignore[override]
+ self,
+ url: httpx._types.URLTypes,
+ *,
+ params: httpx._types.QueryParamTypes = None,
+ headers: httpx._types.HeaderTypes = None,
+ cookies: httpx._types.CookieTypes = None,
+ auth: typing.Union[
+ httpx._types.AuthTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ follow_redirects: bool = None,
+ allow_redirects: bool = None,
+ timeout: typing.Union[
+ httpx._client.TimeoutTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ extensions: dict = None,
+ ) -> httpx.Response:
+ redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
+ return super().options(
+ url,
+ params=params,
+ headers=headers,
+ cookies=cookies,
+ auth=auth,
+ follow_redirects=redirect,
+ timeout=timeout,
+ extensions=extensions,
+ )
+
+ def head( # type: ignore[override]
+ self,
+ url: httpx._types.URLTypes,
+ *,
+ params: httpx._types.QueryParamTypes = None,
+ headers: httpx._types.HeaderTypes = None,
+ cookies: httpx._types.CookieTypes = None,
+ auth: typing.Union[
+ httpx._types.AuthTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ follow_redirects: bool = None,
+ allow_redirects: bool = None,
+ timeout: typing.Union[
+ httpx._client.TimeoutTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ extensions: dict = None,
+ ) -> httpx.Response:
+ redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
+ return super().head(
url,
params=params,
+ headers=headers,
+ cookies=cookies,
+ auth=auth,
+ follow_redirects=redirect,
+ timeout=timeout,
+ extensions=extensions,
+ )
+
+ def post( # type: ignore[override]
+ self,
+ url: httpx._types.URLTypes,
+ *,
+ content: httpx._types.RequestContent = None,
+ data: httpx._types.RequestData = None,
+ files: httpx._types.RequestFiles = None,
+ json: typing.Any = None,
+ params: httpx._types.QueryParamTypes = None,
+ headers: httpx._types.HeaderTypes = None,
+ cookies: httpx._types.CookieTypes = None,
+ auth: typing.Union[
+ httpx._types.AuthTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ follow_redirects: bool = None,
+ allow_redirects: bool = None,
+ timeout: typing.Union[
+ httpx._client.TimeoutTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ extensions: dict = None,
+ ) -> httpx.Response:
+ redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
+ return super().post(
+ url,
+ content=content,
data=data,
+ files=files,
+ json=json,
+ params=params,
headers=headers,
cookies=cookies,
+ auth=auth,
+ follow_redirects=redirect,
+ timeout=timeout,
+ extensions=extensions,
+ )
+
+ def put( # type: ignore[override]
+ self,
+ url: httpx._types.URLTypes,
+ *,
+ content: httpx._types.RequestContent = None,
+ data: httpx._types.RequestData = None,
+ files: httpx._types.RequestFiles = None,
+ json: typing.Any = None,
+ params: httpx._types.QueryParamTypes = None,
+ headers: httpx._types.HeaderTypes = None,
+ cookies: httpx._types.CookieTypes = None,
+ auth: typing.Union[
+ httpx._types.AuthTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ follow_redirects: bool = None,
+ allow_redirects: bool = None,
+ timeout: typing.Union[
+ httpx._client.TimeoutTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ extensions: dict = None,
+ ) -> httpx.Response:
+ redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
+ return super().put(
+ url,
+ content=content,
+ data=data,
files=files,
+ json=json,
+ params=params,
+ headers=headers,
+ cookies=cookies,
auth=auth,
+ follow_redirects=redirect,
timeout=timeout,
- allow_redirects=allow_redirects,
- proxies=proxies,
- hooks=hooks,
- stream=stream,
- verify=verify,
- cert=cert,
+ extensions=extensions,
+ )
+
+ def patch( # type: ignore[override]
+ self,
+ url: httpx._types.URLTypes,
+ *,
+ content: httpx._types.RequestContent = None,
+ data: httpx._types.RequestData = None,
+ files: httpx._types.RequestFiles = None,
+ json: typing.Any = None,
+ params: httpx._types.QueryParamTypes = None,
+ headers: httpx._types.HeaderTypes = None,
+ cookies: httpx._types.CookieTypes = None,
+ auth: typing.Union[
+ httpx._types.AuthTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ follow_redirects: bool = None,
+ allow_redirects: bool = None,
+ timeout: typing.Union[
+ httpx._client.TimeoutTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ extensions: dict = None,
+ ) -> httpx.Response:
+ redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
+ return super().patch(
+ url,
+ content=content,
+ data=data,
+ files=files,
json=json,
+ params=params,
+ headers=headers,
+ cookies=cookies,
+ auth=auth,
+ follow_redirects=redirect,
+ timeout=timeout,
+ extensions=extensions,
+ )
+
+ def delete( # type: ignore[override]
+ self,
+ url: httpx._types.URLTypes,
+ *,
+ params: httpx._types.QueryParamTypes = None,
+ headers: httpx._types.HeaderTypes = None,
+ cookies: httpx._types.CookieTypes = None,
+ auth: typing.Union[
+ httpx._types.AuthTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ follow_redirects: bool = None,
+ allow_redirects: bool = None,
+ timeout: typing.Union[
+ httpx._client.TimeoutTypes, httpx._client.UseClientDefault
+ ] = httpx._client.USE_CLIENT_DEFAULT,
+ extensions: dict = None,
+ ) -> httpx.Response:
+ redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
+ return super().delete(
+ url,
+ params=params,
+ headers=headers,
+ cookies=cookies,
+ auth=auth,
+ follow_redirects=redirect,
+ timeout=timeout,
+ extensions=extensions,
)
def websocket_connect(