def handle_request(self, method, url, headers, stream, extensions):
message = {"text": "Hello, world!"}
content = json.dumps(message).encode("utf-8")
- stream = [content]
+ stream = httpx.ByteStream(content)
headers = [(b"content-type", b"application/json")]
extensions = {}
return 200, headers, stream, extensions
location = b"https://%s%s" % (host, path)
else:
location = b"https://%s:%d%s" % (host, port, path)
- stream = [b""]
+ stream = httpx.ByteStream(b"")
headers = [(b"location", location)]
extensions = {}
return 303, headers, stream, extensions
from ._auth import Auth, BasicAuth, DigestAuth
from ._client import AsyncClient, Client
from ._config import Limits, Proxy, Timeout, create_ssl_context
+from ._content import ByteStream
from ._exceptions import (
CloseError,
ConnectError,
from ._models import URL, Cookies, Headers, QueryParams, Request, Response
from ._status_codes import StatusCode, codes
from ._transports.asgi import ASGITransport
-from ._transports.base import AsyncBaseTransport, BaseTransport
+from ._transports.base import (
+ AsyncBaseTransport,
+ AsyncByteStream,
+ BaseTransport,
+ SyncByteStream,
+)
from ._transports.default import AsyncHTTPTransport, HTTPTransport
from ._transports.mock import MockTransport
from ._transports.wsgi import WSGITransport
"__version__",
"ASGITransport",
"AsyncBaseTransport",
+ "AsyncByteStream",
"AsyncClient",
"AsyncHTTPTransport",
"Auth",
"BaseTransport",
"BasicAuth",
+ "ByteStream",
"Client",
"CloseError",
"codes",
"stream",
"StreamConsumed",
"StreamError",
+ "SyncByteStream",
"Timeout",
"TimeoutException",
"TooManyRedirects",
from ._models import URL, Cookies, Headers, QueryParams, Request, Response
from ._status_codes import codes
from ._transports.asgi import ASGITransport
-from ._transports.base import AsyncBaseTransport, BaseTransport
+from ._transports.base import (
+ AsyncBaseTransport,
+ AsyncByteStream,
+ BaseTransport,
+ SyncByteStream,
+)
from ._transports.default import AsyncHTTPTransport, HTTPTransport
from ._transports.wsgi import WSGITransport
from ._types import (
AuthTypes,
- ByteStream,
CertTypes,
CookieTypes,
HeaderTypes,
def _redirect_stream(
self, request: Request, method: str
- ) -> typing.Optional[ByteStream]:
+ ) -> typing.Optional[typing.Union[SyncByteStream, AsyncByteStream]]:
"""
Return the body that should be used for the redirect request.
"""
def on_close(response: Response) -> None:
response.elapsed = datetime.timedelta(seconds=timer.sync_elapsed())
- if "close" in extensions:
- extensions["close"]()
+ stream.close()
response = Response(
status_code,
async def on_close(response: Response) -> None:
response.elapsed = datetime.timedelta(seconds=await timer.async_elapsed())
- if "aclose" in extensions:
- await extensions["aclose"]()
+ await stream.aclose()
response = Response(
status_code,
from ._exceptions import StreamConsumed
from ._multipart import MultipartStream
-from ._types import (
- ByteStream,
- RequestContent,
- RequestData,
- RequestFiles,
- ResponseContent,
-)
+from ._transports.base import AsyncByteStream, SyncByteStream
+from ._types import RequestContent, RequestData, RequestFiles, ResponseContent
from ._utils import primitive_value_to_str
-class PlainByteStream:
- """
- Request content encoded as plain bytes.
- """
-
- def __init__(self, body: bytes) -> None:
- self._body = body
+class ByteStream(AsyncByteStream, SyncByteStream):
+ def __init__(self, stream: bytes) -> None:
+ self._stream = stream
def __iter__(self) -> Iterator[bytes]:
- yield self._body
+ yield self._stream
async def __aiter__(self) -> AsyncIterator[bytes]:
- yield self._body
+ yield self._stream
-class GeneratorStream:
- """
- Request content encoded as plain bytes, using an byte generator.
- """
-
- def __init__(self, generator: Iterable[bytes]) -> None:
- self._generator = generator
+class IteratorByteStream(SyncByteStream):
+ def __init__(self, stream: Iterable[bytes]):
+ self._stream = stream
self._is_stream_consumed = False
+ self._is_generator = inspect.isgenerator(stream)
def __iter__(self) -> Iterator[bytes]:
- if self._is_stream_consumed:
+ if self._is_stream_consumed and self._is_generator:
raise StreamConsumed()
self._is_stream_consumed = True
- for part in self._generator:
+ for part in self._stream:
yield part
-class AsyncGeneratorStream:
- """
- Request content encoded as plain bytes, using an async byte iterator.
- """
-
- def __init__(self, agenerator: AsyncIterable[bytes]) -> None:
- self._agenerator = agenerator
+class AsyncIteratorByteStream(AsyncByteStream):
+ def __init__(self, stream: AsyncIterable[bytes]):
+ self._stream = stream
self._is_stream_consumed = False
+ self._is_generator = inspect.isasyncgen(stream)
async def __aiter__(self) -> AsyncIterator[bytes]:
- if self._is_stream_consumed:
+ if self._is_stream_consumed and self._is_generator:
raise StreamConsumed()
self._is_stream_consumed = True
- async for part in self._agenerator:
+ async for part in self._stream:
yield part
def encode_content(
- content: Union[str, bytes, ByteStream]
-) -> Tuple[Dict[str, str], ByteStream]:
- if isinstance(content, (str, bytes)):
+ content: Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]]
+) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
+
+ if isinstance(content, (bytes, str)):
body = content.encode("utf-8") if isinstance(content, str) else content
content_length = str(len(body))
headers = {"Content-Length": content_length} if body else {}
- stream = PlainByteStream(body)
- return headers, stream
+ return headers, ByteStream(body)
- elif isinstance(content, (Iterable, AsyncIterable)):
+ elif isinstance(content, Iterable):
headers = {"Transfer-Encoding": "chunked"}
+ return headers, IteratorByteStream(content) # type: ignore
- # Generators should be wrapped in GeneratorStream/AsyncGeneratorStream
- # which will raise `StreamConsumed` if the stream is accessed more
- # than once. (Eg. Following HTTP 307 or HTTP 308 redirects.)
- if inspect.isgenerator(content):
- generator_stream = GeneratorStream(content) # type: ignore
- return headers, generator_stream
- if inspect.isasyncgen(content):
- agenerator_stream = AsyncGeneratorStream(content) # type: ignore
- return headers, agenerator_stream
-
- # Other iterables may be passed through as-is.
- return headers, content # type: ignore
+ elif isinstance(content, AsyncIterable):
+ headers = {"Transfer-Encoding": "chunked"}
+ return headers, AsyncIteratorByteStream(content)
raise TypeError(f"Unexpected type for 'content', {type(content)!r}")
content_length = str(len(body))
content_type = "application/x-www-form-urlencoded"
headers = {"Content-Length": content_length, "Content-Type": content_type}
- return headers, PlainByteStream(body)
+ return headers, ByteStream(body)
def encode_multipart_data(
data: dict, files: RequestFiles, boundary: bytes = None
-) -> Tuple[Dict[str, str], ByteStream]:
- stream = MultipartStream(data=data, files=files, boundary=boundary)
- headers = stream.get_headers()
- return headers, stream
+) -> Tuple[Dict[str, str], MultipartStream]:
+ multipart = MultipartStream(data=data, files=files, boundary=boundary)
+ headers = multipart.get_headers()
+ return headers, multipart
def encode_text(text: str) -> Tuple[Dict[str, str], ByteStream]:
content_length = str(len(body))
content_type = "text/plain; charset=utf-8"
headers = {"Content-Length": content_length, "Content-Type": content_type}
- return headers, PlainByteStream(body)
+ return headers, ByteStream(body)
def encode_html(html: str) -> Tuple[Dict[str, str], ByteStream]:
content_length = str(len(body))
content_type = "text/html; charset=utf-8"
headers = {"Content-Length": content_length, "Content-Type": content_type}
- return headers, PlainByteStream(body)
+ return headers, ByteStream(body)
def encode_json(json: Any) -> Tuple[Dict[str, str], ByteStream]:
content_length = str(len(body))
content_type = "application/json"
headers = {"Content-Length": content_length, "Content-Type": content_type}
- return headers, PlainByteStream(body)
+ return headers, ByteStream(body)
def encode_request(
files: RequestFiles = None,
json: Any = None,
boundary: bytes = None,
-) -> Tuple[Dict[str, str], ByteStream]:
+) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
"""
Handles encoding the given `content`, `data`, `files`, and `json`,
returning a two-tuple of (<headers>, <stream>).
elif json is not None:
return encode_json(json)
- return {}, PlainByteStream(b"")
+ return {}, ByteStream(b"")
def encode_response(
text: str = None,
html: str = None,
json: Any = None,
-) -> Tuple[Dict[str, str], ByteStream]:
+) -> Tuple[Dict[str, str], Union[SyncByteStream, AsyncByteStream]]:
"""
Handles encoding the given `content`, returning a two-tuple of
(<headers>, <stream>).
elif json is not None:
return encode_json(json)
- return {}, PlainByteStream(b"")
+ return {}, ByteStream(b"")
import rfc3986
import rfc3986.exceptions
-from ._content import PlainByteStream, encode_request, encode_response
+from ._content import ByteStream, encode_request, encode_response
from ._decoders import (
SUPPORTED_DECODERS,
ByteChunker,
request_context,
)
from ._status_codes import codes
+from ._transports.base import AsyncByteStream, SyncByteStream
from ._types import (
- ByteStream,
CookieTypes,
HeaderTypes,
PrimitiveData,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
- stream: ByteStream = None,
+ stream: typing.Union[SyncByteStream, AsyncByteStream] = None,
):
if isinstance(method, bytes):
self.method = method.decode("ascii").upper()
# If a streaming request has been read entirely into memory, then
# we can replace the stream with a raw bytes implementation,
# to ensure that any non-replayable streams can still be used.
- self.stream = PlainByteStream(self._content)
+ self.stream = ByteStream(self._content)
return self._content
async def aread(self) -> bytes:
# If a streaming request has been read entirely into memory, then
# we can replace the stream with a raw bytes implementation,
# to ensure that any non-replayable streams can still be used.
- self.stream = PlainByteStream(self._content)
+ self.stream = ByteStream(self._content)
return self._content
def __repr__(self) -> str:
text: str = None,
html: str = None,
json: typing.Any = None,
- stream: ByteStream = None,
+ stream: typing.Union[SyncByteStream, AsyncByteStream] = None,
request: Request = None,
extensions: dict = None,
history: typing.List["Response"] = None,
raise StreamConsumed()
if self.is_closed:
raise ResponseClosed()
- if not isinstance(self.stream, typing.Iterable):
+ if not isinstance(self.stream, SyncByteStream):
raise RuntimeError("Attempted to call a sync iterator on an async stream.")
self.is_stream_consumed = True
raise StreamConsumed()
if self.is_closed:
raise ResponseClosed()
- if not isinstance(self.stream, typing.AsyncIterable):
- raise RuntimeError("Attempted to call a async iterator on a sync stream.")
+ if not isinstance(self.stream, AsyncByteStream):
+ raise RuntimeError("Attempted to call an async iterator on an sync stream.")
self.is_stream_consumed = True
self._num_bytes_downloaded = 0
import typing
from pathlib import Path
+from ._transports.base import AsyncByteStream, SyncByteStream
from ._types import FileContent, FileTypes, RequestFiles
from ._utils import (
format_form_param,
yield from self.render_data()
-class MultipartStream:
+class MultipartStream(SyncByteStream, AsyncByteStream):
"""
Request content as streaming multipart encoded form data.
"""
import sniffio
-from .base import AsyncBaseTransport
+from .base import AsyncBaseTransport, AsyncByteStream
if typing.TYPE_CHECKING: # pragma: no cover
import asyncio
return asyncio.Event()
+class ASGIResponseStream(AsyncByteStream):
+ def __init__(self, body: typing.List[bytes]) -> None:
+ self._body = body
+
+ async def __aiter__(self) -> typing.AsyncIterator[bytes]:
+ yield b"".join(self._body)
+
+
class ASGITransport(AsyncBaseTransport):
"""
A custom AsyncTransport that handles sending requests directly to an ASGI app.
method: bytes,
url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: typing.AsyncIterable[bytes],
+ stream: AsyncByteStream,
extensions: dict,
) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], typing.AsyncIterable[bytes], dict
+ int, typing.List[typing.Tuple[bytes, bytes]], AsyncByteStream, dict
]:
# ASGI scope.
scheme, host, port, full_path = url
assert status_code is not None
assert response_headers is not None
- async def response_stream() -> typing.AsyncIterator[bytes]:
- yield b"".join(body_parts)
-
+ stream = ASGIResponseStream(body_parts)
extensions = {}
- return (status_code, response_headers, response_stream(), extensions)
+ return (status_code, response_headers, stream, extensions)
A = typing.TypeVar("A", bound="AsyncBaseTransport")
+class SyncByteStream:
+ def __iter__(self) -> typing.Iterator[bytes]:
+ raise NotImplementedError(
+ "The '__iter__' method must be implemented."
+ ) # pragma: nocover
+ yield b"" # pragma: nocover
+
+ def close(self) -> None:
+ """
+ Subclasses can override this method to release any network resources
+ after a request/response cycle is complete.
+
+ Streaming cases should use a `try...finally` block to ensure that
+ the stream `close()` method is always called.
+
+ Example:
+
+ status_code, headers, stream, extensions = transport.handle_request(...)
+ try:
+ ...
+ finally:
+ stream.close()
+ """
+
+ def read(self) -> bytes:
+ """
+ Simple cases can use `.read()` as a convience method for consuming
+ the entire stream and then closing it.
+
+ Example:
+
+ status_code, headers, stream, extensions = transport.handle_request(...)
+ body = stream.read()
+ """
+ try:
+ return b"".join([part for part in self])
+ finally:
+ self.close()
+
+
+class AsyncByteStream:
+ async def __aiter__(self) -> typing.AsyncIterator[bytes]:
+ raise NotImplementedError(
+ "The '__aiter__' method must be implemented."
+ ) # pragma: nocover
+ yield b"" # pragma: nocover
+
+ async def aclose(self) -> None:
+ pass
+
+ async def aread(self) -> bytes:
+ try:
+ return b"".join([part async for part in self])
+ finally:
+ await self.aclose()
+
+
class BaseTransport:
def __enter__(self: T) -> T:
return self
method: bytes,
url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: typing.Iterable[bytes],
+ stream: SyncByteStream,
extensions: dict,
) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], typing.Iterable[bytes], dict
+ int, typing.List[typing.Tuple[bytes, bytes]], SyncByteStream, dict
]:
"""
Send a single HTTP request and return a response.
since the Client class provides all the higher level user-facing API
niceties.
+ In order to properly release any network resources, the response stream
+ should *either* be consumed immediately, with a call to `stream.read()`,
+ or else the `handle_request` call should be followed with a try/finally
+ block to ensuring the stream is always closed.
+
Example usage:
with httpx.HTTPTransport() as transport:
stream=[],
extensions={}
)
- try:
- body = b''.join([part for part in stream])
- finally:
- if 'close' in extensions:
- extensions['close']()
+ body = stream.read()
print(status_code, headers, body)
Arguments:
eg. the leading response bytes were b"HTTP/1.1 200 <CRLF>".
http_version: The HTTP version, as bytes. Eg. b"HTTP/1.1".
When no http_version key is included, HTTP/1.1 may be assumed.
- close: A callback which should be invoked to release any network
- resources.
- aclose: An async callback which should be invoked to release any
- network resources.
"""
raise NotImplementedError(
"The 'handle_request' method must be implemented."
method: bytes,
url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: typing.AsyncIterable[bytes],
+ stream: AsyncByteStream,
extensions: dict,
) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], typing.AsyncIterable[bytes], dict
+ int, typing.List[typing.Tuple[bytes, bytes]], AsyncByteStream, dict
]:
raise NotImplementedError(
"The 'handle_async_request' method must be implemented."
WriteTimeout,
)
from .._types import CertTypes, VerifyTypes
-from .base import AsyncBaseTransport, BaseTransport
+from .base import AsyncBaseTransport, AsyncByteStream, BaseTransport, SyncByteStream
T = typing.TypeVar("T", bound="HTTPTransport")
A = typing.TypeVar("A", bound="AsyncHTTPTransport")
}
+class ResponseStream(SyncByteStream):
+ def __init__(self, httpcore_stream: httpcore.SyncByteStream):
+ self._httpcore_stream = httpcore_stream
+
+ def __iter__(self) -> typing.Iterator[bytes]:
+ with map_httpcore_exceptions():
+ for part in self._httpcore_stream:
+ yield part
+
+ def close(self) -> None:
+ with map_httpcore_exceptions():
+ self._httpcore_stream.close()
+
+
class HTTPTransport(BaseTransport):
def __init__(
self,
method: bytes,
url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: typing.Iterable[bytes],
+ stream: SyncByteStream,
extensions: dict,
) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], typing.Iterable[bytes], dict
+ int, typing.List[typing.Tuple[bytes, bytes]], SyncByteStream, dict
]:
with map_httpcore_exceptions():
status_code, headers, byte_stream, extensions = self._pool.request(
ext=extensions,
)
- def response_stream() -> typing.Iterator[bytes]:
- with map_httpcore_exceptions():
- for part in byte_stream:
- yield part
-
- def close() -> None:
- with map_httpcore_exceptions():
- byte_stream.close()
-
ensure_http_version_reason_phrase_as_bytes(extensions)
- extensions["close"] = close
+ stream = ResponseStream(byte_stream)
- return status_code, headers, response_stream(), extensions
+ return status_code, headers, stream, extensions
def close(self) -> None:
self._pool.close()
+class AsyncResponseStream(AsyncByteStream):
+ def __init__(self, httpcore_stream: httpcore.AsyncByteStream):
+ self._httpcore_stream = httpcore_stream
+
+ async def __aiter__(self) -> typing.AsyncIterator[bytes]:
+ with map_httpcore_exceptions():
+ async for part in self._httpcore_stream:
+ yield part
+
+ async def aclose(self) -> None:
+ with map_httpcore_exceptions():
+ await self._httpcore_stream.aclose()
+
+
class AsyncHTTPTransport(AsyncBaseTransport):
def __init__(
self,
method: bytes,
url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: typing.AsyncIterable[bytes],
+ stream: AsyncByteStream,
extensions: dict,
) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], typing.AsyncIterable[bytes], dict
+ int, typing.List[typing.Tuple[bytes, bytes]], AsyncByteStream, dict
]:
with map_httpcore_exceptions():
status_code, headers, byte_stream, extensions = await self._pool.arequest(
ext=extensions,
)
- async def response_stream() -> typing.AsyncIterator[bytes]:
- with map_httpcore_exceptions():
- async for part in byte_stream:
- yield part
-
- async def aclose() -> None:
- with map_httpcore_exceptions():
- await byte_stream.aclose()
-
ensure_http_version_reason_phrase_as_bytes(extensions)
- extensions["aclose"] = aclose
+ stream = AsyncResponseStream(byte_stream)
- return status_code, headers, response_stream(), extensions
+ return status_code, headers, stream, extensions
async def aclose(self) -> None:
await self._pool.aclose()
import typing
from .._models import Request
-from .base import AsyncBaseTransport, BaseTransport
+from .base import AsyncBaseTransport, AsyncByteStream, BaseTransport, SyncByteStream
class MockTransport(AsyncBaseTransport, BaseTransport):
method: bytes,
url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: typing.Iterable[bytes],
+ stream: SyncByteStream,
extensions: dict,
) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], typing.Iterable[bytes], dict
+ int, typing.List[typing.Tuple[bytes, bytes]], SyncByteStream, dict
]:
request = Request(
method=method,
method: bytes,
url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: typing.AsyncIterable[bytes],
+ stream: AsyncByteStream,
extensions: dict,
) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], typing.AsyncIterable[bytes], dict
+ int, typing.List[typing.Tuple[bytes, bytes]], AsyncByteStream, dict
]:
request = Request(
method=method,
import typing
from urllib.parse import unquote
-from .base import BaseTransport
+from .base import BaseTransport, SyncByteStream
def _skip_leading_empty_chunks(body: typing.Iterable) -> typing.Iterable:
return []
+class WSGIByteStream(SyncByteStream):
+ def __init__(self, result: typing.Iterable[bytes]) -> None:
+ self._result = _skip_leading_empty_chunks(result)
+
+ def __iter__(self) -> typing.Iterator[bytes]:
+ for part in self._result:
+ yield part
+
+
class WSGITransport(BaseTransport):
"""
A custom transport that handles sending requests directly to an WSGI app.
method: bytes,
url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: typing.Iterable[bytes],
+ stream: SyncByteStream,
extensions: dict,
) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], typing.Iterable[bytes], dict
+ int, typing.List[typing.Tuple[bytes, bytes]], SyncByteStream, dict
]:
wsgi_input = io.BytesIO(b"".join(stream))
seen_exc_info = exc_info
result = self.app(environ, start_response)
- # This is needed because the status returned by start_response
- # shouldn't be used until the first non-empty chunk has been served.
- result = _skip_leading_empty_chunks(result)
+
+ stream = WSGIByteStream(result)
assert seen_status is not None
assert seen_response_headers is not None
]
extensions = {}
- return (status_code, headers, result, extensions)
+ return (status_code, headers, stream, extensions)
None,
]
-ByteStream = Union[Iterable[bytes], AsyncIterable[bytes]]
-RequestContent = Union[str, bytes, ByteStream]
-ResponseContent = Union[str, bytes, ByteStream]
+RequestContent = Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]]
+ResponseContent = Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]]
RequestData = dict
import pytest
-from httpx import StreamConsumed
+import httpx
from httpx._content import encode_request, encode_response
@pytest.mark.asyncio
async def test_empty_content():
headers, stream = encode_request()
- assert isinstance(stream, typing.Iterable)
- assert isinstance(stream, typing.AsyncIterable)
+ assert isinstance(stream, httpx.SyncByteStream)
+ assert isinstance(stream, httpx.AsyncByteStream)
- sync_content = b"".join([part for part in stream])
- async_content = b"".join([part async for part in stream])
+ sync_content = stream.read()
+ async_content = await stream.aread()
assert headers == {}
assert sync_content == b""
assert headers == {"Transfer-Encoding": "chunked"}
assert content == b"Hello, world!"
- with pytest.raises(StreamConsumed):
+ with pytest.raises(httpx.StreamConsumed):
[part for part in stream]
# Support 'data' for compat with requests.
assert headers == {"Transfer-Encoding": "chunked"}
assert content == b"Hello, world!"
- with pytest.raises(StreamConsumed):
+ with pytest.raises(httpx.StreamConsumed):
[part async for part in stream]
# Support 'data' for compat with requests.
assert headers == {"Transfer-Encoding": "chunked"}
assert content == b"Hello, world!"
- with pytest.raises(StreamConsumed):
+ with pytest.raises(httpx.StreamConsumed):
[part for part in stream]
assert headers == {"Transfer-Encoding": "chunked"}
assert content == b"Hello, world!"
- with pytest.raises(StreamConsumed):
+ with pytest.raises(httpx.StreamConsumed):
[part async for part in stream]