import enum
import typing
import warnings
+from contextlib import contextmanager
from types import TracebackType
from .__version__ import __version__
from ._auth import Auth, BasicAuth, FunctionAuth
+from ._compat import asynccontextmanager
from ._config import (
DEFAULT_LIMITS,
DEFAULT_MAX_REDIRECTS,
def params(self, params: QueryParamTypes) -> None:
self._params = QueryParams(params)
- def stream(
- self,
- method: str,
- url: URLTypes,
- *,
- content: RequestContent = None,
- data: RequestData = None,
- files: RequestFiles = None,
- json: typing.Any = None,
- params: QueryParamTypes = None,
- headers: HeaderTypes = None,
- cookies: CookieTypes = None,
- auth: typing.Union[AuthTypes, UnsetType] = UNSET,
- allow_redirects: bool = True,
- timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
- ) -> "StreamContextManager":
- """
- Alternative to `httpx.request()` that streams the response body
- instead of loading it into memory at once.
-
- **Parameters**: See `httpx.request`.
-
- See also: [Streaming Responses][0]
-
- [0]: /quickstart#streaming-responses
- """
- request = self.build_request(
- method=method,
- url=url,
- content=content,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- )
- return StreamContextManager(
- client=self,
- request=request,
- auth=auth,
- allow_redirects=allow_redirects,
- timeout=timeout,
- )
-
def build_request(
self,
method: str,
request, auth=auth, allow_redirects=allow_redirects, timeout=timeout
)
+ @contextmanager
+ def stream(
+ self,
+ method: str,
+ url: URLTypes,
+ *,
+ content: RequestContent = None,
+ data: RequestData = None,
+ files: RequestFiles = None,
+ json: typing.Any = None,
+ params: QueryParamTypes = None,
+ headers: HeaderTypes = None,
+ cookies: CookieTypes = None,
+ auth: typing.Union[AuthTypes, UnsetType] = UNSET,
+ allow_redirects: bool = True,
+ timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
+ ) -> typing.Iterator[Response]:
+ """
+ Alternative to `httpx.request()` that streams the response body
+ instead of loading it into memory at once.
+
+ **Parameters**: See `httpx.request`.
+
+ See also: [Streaming Responses][0]
+
+ [0]: /quickstart#streaming-responses
+ """
+ request = self.build_request(
+ method=method,
+ url=url,
+ content=content,
+ data=data,
+ files=files,
+ json=json,
+ params=params,
+ headers=headers,
+ cookies=cookies,
+ )
+ response = self.send(
+ request=request,
+ auth=auth,
+ allow_redirects=allow_redirects,
+ timeout=timeout,
+ stream=True,
+ )
+ try:
+ yield response
+ finally:
+ response.close()
+
def send(
self,
request: Request,
)
return response
+ @asynccontextmanager
+ async def stream(
+ self,
+ method: str,
+ url: URLTypes,
+ *,
+ content: RequestContent = None,
+ data: RequestData = None,
+ files: RequestFiles = None,
+ json: typing.Any = None,
+ params: QueryParamTypes = None,
+ headers: HeaderTypes = None,
+ cookies: CookieTypes = None,
+ auth: typing.Union[AuthTypes, UnsetType] = UNSET,
+ allow_redirects: bool = True,
+ timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
+ ) -> typing.AsyncIterator[Response]:
+ """
+ Alternative to `httpx.request()` that streams the response body
+ instead of loading it into memory at once.
+
+ **Parameters**: See `httpx.request`.
+
+ See also: [Streaming Responses][0]
+
+ [0]: /quickstart#streaming-responses
+ """
+ request = self.build_request(
+ method=method,
+ url=url,
+ content=content,
+ data=data,
+ files=files,
+ json=json,
+ params=params,
+ headers=headers,
+ cookies=cookies,
+ )
+ response = await self.send(
+ request=request,
+ auth=auth,
+ allow_redirects=allow_redirects,
+ timeout=timeout,
+ stream=True,
+ )
+ try:
+ yield response
+ finally:
+ await response.aclose()
+
async def send(
self,
request: Request,
"See https://www.python-httpx.org/async/#opening-and-closing-clients "
"for details."
)
-
-
-class StreamContextManager:
- def __init__(
- self,
- client: BaseClient,
- request: Request,
- *,
- auth: typing.Union[AuthTypes, UnsetType] = UNSET,
- allow_redirects: bool = True,
- timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
- close_client: bool = False,
- ) -> None:
- self.client = client
- self.request = request
- self.auth = auth
- self.allow_redirects = allow_redirects
- self.timeout = timeout
- self.close_client = close_client
-
- def __enter__(self) -> "Response":
- assert isinstance(self.client, Client)
- self.response = self.client.send(
- request=self.request,
- auth=self.auth,
- allow_redirects=self.allow_redirects,
- timeout=self.timeout,
- stream=True,
- )
- return self.response
-
- def __exit__(
- self,
- exc_type: typing.Type[BaseException] = None,
- exc_value: BaseException = None,
- traceback: TracebackType = None,
- ) -> None:
- assert isinstance(self.client, Client)
- self.response.close()
- if self.close_client:
- self.client.close()
-
- async def __aenter__(self) -> "Response":
- assert isinstance(self.client, AsyncClient)
- self.response = await self.client.send(
- request=self.request,
- auth=self.auth,
- allow_redirects=self.allow_redirects,
- timeout=self.timeout,
- stream=True,
- )
- return self.response
-
- async def __aexit__(
- self,
- exc_type: typing.Type[BaseException] = None,
- exc_value: BaseException = None,
- traceback: TracebackType = None,
- ) -> None:
- assert isinstance(self.client, AsyncClient)
- await self.response.aclose()