import queue
import sys
import typing
-import warnings
from concurrent.futures import Future
from functools import cached_property
from types import GeneratorType
with anyio.from_thread.start_blocking_portal(**self.async_backend) as portal:
yield portal
- def _choose_redirect_arg(
- self, follow_redirects: bool | None, allow_redirects: bool | None
- ) -> bool | httpx._client.UseClientDefault:
- redirect: bool | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT
- if allow_redirects is not None:
- message = "The `allow_redirects` argument is deprecated. Use `follow_redirects` instead."
- warnings.warn(message, DeprecationWarning)
- redirect = allow_redirects
- if follow_redirects is not None:
- redirect = follow_redirects
- elif allow_redirects is not None and follow_redirects is not None:
- raise RuntimeError( # pragma: no cover
- "Cannot use both `allow_redirects` and `follow_redirects`."
- )
- return redirect
-
def request( # type: ignore[override]
self,
method: str,
headers: httpx._types.HeaderTypes | None = None,
cookies: httpx._types.CookieTypes | None = None,
auth: httpx._types.AuthTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
- follow_redirects: bool | None = None,
- allow_redirects: bool | None = None,
+ follow_redirects: bool | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
timeout: httpx._types.TimeoutTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
extensions: dict[str, typing.Any] | None = None,
) -> httpx.Response:
url = self._merge_url(url)
- redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
return super().request(
method,
url,
headers=headers,
cookies=cookies,
auth=auth,
- follow_redirects=redirect,
+ follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
headers: httpx._types.HeaderTypes | None = None,
cookies: httpx._types.CookieTypes | None = None,
auth: httpx._types.AuthTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
- follow_redirects: bool | None = None,
- allow_redirects: bool | None = None,
+ follow_redirects: bool | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
timeout: httpx._types.TimeoutTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
extensions: dict[str, typing.Any] | None = None,
) -> httpx.Response:
- redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
return super().get(
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
- follow_redirects=redirect,
+ follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
headers: httpx._types.HeaderTypes | None = None,
cookies: httpx._types.CookieTypes | None = None,
auth: httpx._types.AuthTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
- follow_redirects: bool | None = None,
- allow_redirects: bool | None = None,
+ follow_redirects: bool | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
timeout: httpx._types.TimeoutTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
extensions: dict[str, typing.Any] | None = None,
) -> httpx.Response:
- redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
return super().options(
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
- follow_redirects=redirect,
+ follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
headers: httpx._types.HeaderTypes | None = None,
cookies: httpx._types.CookieTypes | None = None,
auth: httpx._types.AuthTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
- follow_redirects: bool | None = None,
- allow_redirects: bool | None = None,
+ follow_redirects: bool | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
timeout: httpx._types.TimeoutTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
extensions: dict[str, typing.Any] | None = None,
) -> httpx.Response:
- redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
return super().head(
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
- follow_redirects=redirect,
+ follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
headers: httpx._types.HeaderTypes | None = None,
cookies: httpx._types.CookieTypes | None = None,
auth: httpx._types.AuthTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
- follow_redirects: bool | None = None,
- allow_redirects: bool | None = None,
+ follow_redirects: bool | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
timeout: httpx._types.TimeoutTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
extensions: dict[str, typing.Any] | None = None,
) -> httpx.Response:
- redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
return super().post(
url,
content=content,
headers=headers,
cookies=cookies,
auth=auth,
- follow_redirects=redirect,
+ follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
headers: httpx._types.HeaderTypes | None = None,
cookies: httpx._types.CookieTypes | None = None,
auth: httpx._types.AuthTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
- follow_redirects: bool | None = None,
- allow_redirects: bool | None = None,
+ follow_redirects: bool | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
timeout: httpx._types.TimeoutTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
extensions: dict[str, typing.Any] | None = None,
) -> httpx.Response:
- redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
return super().put(
url,
content=content,
headers=headers,
cookies=cookies,
auth=auth,
- follow_redirects=redirect,
+ follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
headers: httpx._types.HeaderTypes | None = None,
cookies: httpx._types.CookieTypes | None = None,
auth: httpx._types.AuthTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
- follow_redirects: bool | None = None,
- allow_redirects: bool | None = None,
+ follow_redirects: bool | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
timeout: httpx._types.TimeoutTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
extensions: dict[str, typing.Any] | None = None,
) -> httpx.Response:
- redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
return super().patch(
url,
content=content,
headers=headers,
cookies=cookies,
auth=auth,
- follow_redirects=redirect,
+ follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
headers: httpx._types.HeaderTypes | None = None,
cookies: httpx._types.CookieTypes | None = None,
auth: httpx._types.AuthTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
- follow_redirects: bool | None = None,
- allow_redirects: bool | None = None,
+ follow_redirects: bool | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
timeout: httpx._types.TimeoutTypes | httpx._client.UseClientDefault = httpx._client.USE_CLIENT_DEFAULT,
extensions: dict[str, typing.Any] | None = None,
) -> httpx.Response:
- redirect = self._choose_redirect_arg(follow_redirects, allow_redirects)
return super().delete(
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
- follow_redirects=redirect,
+ follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)