class ResponseNotRead(StreamError):
"""
- Attempted to access response content, without having called `read()`
- after a streaming response.
+ Attempted to access streaming response content, without having called `read()`.
"""
def __init__(self) -> None:
- message = (
- "Attempted to access response content, without having called `read()` "
- "after a streaming response."
- )
+ message = "Attempted to access streaming response content, without having called `read()`."
super().__init__(message)
class RequestNotRead(StreamError):
"""
- Attempted to access request content, without having called `read()`.
+ Attempted to access streaming request content, without having called `read()`.
"""
def __init__(self) -> None:
- message = "Attempted to access request content, without having called `read()`."
+ message = "Attempted to access streaming request content, without having called `read()`."
super().__init__(message)
headers, stream = encode_request(content, data, files, json)
self._prepare(headers)
self.stream = stream
+ # Load the request body, except for streaming content.
+ if isinstance(stream, ByteStream):
+ self.read()
def _prepare(self, default_headers: typing.Dict[str, str]) -> None:
for key, value in default_headers.items():
if not hasattr(self, "_content"):
assert isinstance(self.stream, typing.Iterable)
self._content = b"".join(self.stream)
- # If a streaming request has been read entirely into memory, then
- # we can replace the stream with a raw bytes implementation,
- # to ensure that any non-replayable streams can still be used.
- self.stream = ByteStream(self._content)
+ if not isinstance(self.stream, ByteStream):
+ # If a streaming request has been read entirely into memory, then
+ # we can replace the stream with a raw bytes implementation,
+ # to ensure that any non-replayable streams can still be used.
+ self.stream = ByteStream(self._content)
return self._content
async def aread(self) -> bytes:
if not hasattr(self, "_content"):
assert isinstance(self.stream, typing.AsyncIterable)
self._content = b"".join([part async for part in self.stream])
- # If a streaming request has been read entirely into memory, then
- # we can replace the stream with a raw bytes implementation,
- # to ensure that any non-replayable streams can still be used.
- self.stream = ByteStream(self._content)
+ if not isinstance(self.stream, ByteStream):
+ # If a streaming request has been read entirely into memory, then
+ # we can replace the stream with a raw bytes implementation,
+ # to ensure that any non-replayable streams can still be used.
+ self.stream = ByteStream(self._content)
return self._content
def __repr__(self) -> str:
headers, stream = encode_response(content, text, html, json)
self._prepare(headers)
self.stream = stream
- if content is None or isinstance(content, (bytes, str)):
+ if isinstance(stream, ByteStream):
# Load the response body, except for streaming content.
self.read()
assert content == request.content
-@pytest.mark.asyncio
-async def test_cannot_access_content_without_read():
- # Ensure a request may still be streamed if it has been read.
- # Needed for cases such as authentication classes that read the request body.
- request = httpx.Request("POST", "http://example.org", json={"test": 123})
+def test_cannot_access_streaming_content_without_read():
+ # Ensure that streaming requests
+ def streaming_body(): # pragma: nocover
+ yield ""
+
+ request = httpx.Request("POST", "http://example.org", content=streaming_body())
with pytest.raises(httpx.RequestNotRead):
request.content