import httpx
from httpx._content_streams import AsyncIteratorStream, IteratorStream
-REQUEST = httpx.Request("GET", "https://example.org")
-
def streaming_body():
yield b"Hello, "
def test_response():
- response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=b"Hello, world!",
+ request=httpx.Request("GET", "https://example.org"),
+ )
assert response.status_code == 200
assert response.reason_phrase == "OK"
assert response.text == "Hello, world!"
- assert response.request is REQUEST
+ assert response.request.method == "GET"
+ assert response.request.url == "https://example.org"
assert response.elapsed >= datetime.timedelta(0)
assert not response.is_error
def test_raise_for_status():
+ request = httpx.Request("GET", "https://example.org")
+
# 2xx status codes are not an error.
- response = httpx.Response(200, request=REQUEST)
+ response = httpx.Response(200, request=request)
response.raise_for_status()
# 4xx status codes are a client error.
- response = httpx.Response(403, request=REQUEST)
+ response = httpx.Response(403, request=request)
with pytest.raises(httpx.HTTPStatusError):
response.raise_for_status()
# 5xx status codes are a server error.
- response = httpx.Response(500, request=REQUEST)
+ response = httpx.Response(500, request=request)
with pytest.raises(httpx.HTTPStatusError):
response.raise_for_status()
def test_response_repr():
- response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=b"Hello, world!",
+ )
assert repr(response) == "<Response [200 OK]>"
"""
headers = {"Content-Type": "text-plain; charset=latin-1"}
content = "Latin 1: ÿ".encode("latin-1")
- response = httpx.Response(200, content=content, headers=headers, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=content,
+ headers=headers,
+ )
assert response.text == "Latin 1: ÿ"
assert response.encoding == "latin-1"
Autodetect encoding if there is no charset info in a Content-Type header.
"""
content = "おはようございます。".encode("EUC-JP")
- response = httpx.Response(200, content=content, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=content,
+ )
assert response.text == "おはようございます。"
assert response.encoding == "EUC-JP"
"""
headers = {"Content-Type": "text-plain; charset=invalid-codec-name"}
content = "おはようございます。".encode("EUC-JP")
- response = httpx.Response(200, content=content, headers=headers, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=content,
+ headers=headers,
+ )
assert response.text == "おはようございます。"
assert response.encoding == "EUC-JP"
"""
content = b"Hello, world!"
headers = {"Content-Type": "text/plain"}
- response = httpx.Response(200, content=content, headers=headers, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=content,
+ headers=headers,
+ )
assert response.status_code == 200
assert response.encoding == "iso-8859-1"
assert response.text == "Hello, world!"
"""
Default to utf-8 if all else fails.
"""
- response = httpx.Response(200, content=b"", request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=b"",
+ )
assert response.text == ""
assert response.encoding == "utf-8"
Default to apparent encoding for non-text content-type headers.
"""
headers = {"Content-Type": "image/png"}
- response = httpx.Response(200, content=b"xyz", headers=headers, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=b"xyz",
+ headers=headers,
+ )
assert response.text == "xyz"
assert response.encoding == "ascii"
"Content-Type": "text-plain; charset=utf-8"
} # Deliberately incorrect charset
response = httpx.Response(
- 200, content="Latin 1: ÿ".encode("latin-1"), headers=headers, request=REQUEST
+ 200,
+ content="Latin 1: ÿ".encode("latin-1"),
+ headers=headers,
)
response.encoding = "latin-1"
assert response.text == "Latin 1: ÿ"
def test_response_force_encoding():
response = httpx.Response(
- 200, content="Snowman: ☃".encode("utf-8"), request=REQUEST
+ 200,
+ content="Snowman: ☃".encode("utf-8"),
)
response.encoding = "iso-8859-1"
assert response.status_code == 200
def test_read():
- response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=b"Hello, world!",
+ )
assert response.status_code == 200
assert response.text == "Hello, world!"
@pytest.mark.asyncio
async def test_aread():
- response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=b"Hello, world!",
+ )
assert response.status_code == 200
assert response.text == "Hello, world!"
def test_iter_raw():
stream = IteratorStream(iterator=streaming_body())
- response = httpx.Response(200, stream=stream, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ stream=stream,
+ )
raw = b""
for part in response.iter_raw():
@pytest.mark.asyncio
async def test_aiter_raw():
stream = AsyncIteratorStream(aiterator=async_streaming_body())
- response = httpx.Response(200, stream=stream, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ stream=stream,
+ )
raw = b""
async for part in response.aiter_raw():
def test_iter_bytes():
- response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=b"Hello, world!",
+ )
content = b""
for part in response.iter_bytes():
@pytest.mark.asyncio
async def test_aiter_bytes():
- response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=b"Hello, world!",
+ )
content = b""
async for part in response.aiter_bytes():
def test_iter_text():
- response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=b"Hello, world!",
+ )
content = ""
for part in response.iter_text():
@pytest.mark.asyncio
async def test_aiter_text():
- response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=b"Hello, world!",
+ )
content = ""
async for part in response.aiter_text():
def test_iter_lines():
- response = httpx.Response(200, content=b"Hello,\nworld!", request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=b"Hello,\nworld!",
+ )
content = []
for line in response.iter_lines():
@pytest.mark.asyncio
async def test_aiter_lines():
- response = httpx.Response(200, content=b"Hello,\nworld!", request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=b"Hello,\nworld!",
+ )
content = []
async for line in response.aiter_lines():
def test_sync_streaming_response():
stream = IteratorStream(iterator=streaming_body())
- response = httpx.Response(200, stream=stream, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ stream=stream,
+ )
assert response.status_code == 200
assert not response.is_closed
@pytest.mark.asyncio
async def test_async_streaming_response():
stream = AsyncIteratorStream(aiterator=async_streaming_body())
- response = httpx.Response(200, stream=stream, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ stream=stream,
+ )
assert response.status_code == 200
assert not response.is_closed
def test_cannot_read_after_stream_consumed():
stream = IteratorStream(iterator=streaming_body())
- response = httpx.Response(200, stream=stream, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ stream=stream,
+ )
content = b""
for part in response.iter_bytes():
@pytest.mark.asyncio
async def test_cannot_aread_after_stream_consumed():
stream = AsyncIteratorStream(aiterator=async_streaming_body())
- response = httpx.Response(200, stream=stream, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ stream=stream,
+ )
content = b""
async for part in response.aiter_bytes():
is_closed = True
stream = IteratorStream(iterator=streaming_body(), close_func=close_func)
- response = httpx.Response(200, stream=stream, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ stream=stream,
+ )
response.close()
assert is_closed
stream = AsyncIteratorStream(
aiterator=async_streaming_body(), close_func=close_func
)
- response = httpx.Response(200, stream=stream, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ stream=stream,
+ )
await response.aclose()
assert is_closed
@pytest.mark.asyncio
async def test_elapsed_not_available_until_closed():
stream = AsyncIteratorStream(aiterator=async_streaming_body())
- response = httpx.Response(200, stream=stream, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ stream=stream,
+ )
with pytest.raises(RuntimeError):
response.elapsed
def test_unknown_status_code():
- response = httpx.Response(600, request=REQUEST)
+ response = httpx.Response(
+ 600,
+ )
assert response.status_code == 600
assert response.reason_phrase == ""
assert response.text == ""
data = {"greeting": "hello", "recipient": "world"}
content = json.dumps(data).encode("utf-16")
headers = {"Content-Type": "application/json, charset=utf-16"}
- response = httpx.Response(200, content=content, headers=headers, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=content,
+ headers=headers,
+ )
assert response.json() == data
data = {"greeting": "hello", "recipient": "world", "amount": 1}
content = json.dumps(data).encode("utf-16")
headers = {"Content-Type": "application/json, charset=utf-16"}
- response = httpx.Response(200, content=content, headers=headers, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=content,
+ headers=headers,
+ )
assert response.json(parse_int=str)["amount"] == "1"
data = {"greeting": "hello", "recipient": "world"}
content = json.dumps(data).encode("utf-32-be")
headers = {"Content-Type": "application/json"}
- response = httpx.Response(200, content=content, headers=headers, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=content,
+ headers=headers,
+ )
assert response.json() == data
# force incorrect guess from `guess_json_utf` to trigger error
with mock.patch("httpx._models.guess_json_utf", return_value="utf-32"):
response = httpx.Response(
- 200, content=content, headers=headers, request=REQUEST
+ 200,
+ content=content,
+ headers=headers,
)
with pytest.raises(json.decoder.JSONDecodeError):
response.json()
],
)
def test_link_headers(headers, expected):
- response = httpx.Response(200, content=None, headers=headers, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ content=None,
+ headers=headers,
+ )
assert response.links == expected
headers = [(b"Content-Encoding", header_value)]
body = b"test 123"
compressed_body = brotli.compress(body)[3:]
+ with pytest.raises(ValueError):
+ httpx.Response(
+ 200,
+ headers=headers,
+ content=compressed_body,
+ )
+
with pytest.raises(httpx.DecodingError):
- httpx.Response(200, headers=headers, content=compressed_body, request=REQUEST)
+ httpx.Response(
+ 200,
+ headers=headers,
+ content=compressed_body,
+ request=httpx.Request("GET", "https://www.example.org/"),
+ )
@pytest.mark.parametrize("header_value", (b"deflate", b"gzip", b"br"))
def test_set_request_after_init():
response = httpx.Response(200, content=b"Hello, world!")
- response.request = REQUEST
+ response.request = httpx.Request("GET", "https://www.example.org")
- assert response.request == REQUEST
+ assert response.request.method == "GET"
+ assert response.request.url == "https://www.example.org"
def test_cannot_access_unset_request():
response = httpx.Response(200, content=b"Hello, world!")
with pytest.raises(RuntimeError):
- assert response.request is not None
+ response.request
TextDecoder,
)
-REQUEST = httpx.Request("GET", "https://example.org")
-
def test_deflate():
"""
headers = [(b"Content-Encoding", b"deflate")]
response = httpx.Response(
- 200, headers=headers, content=compressed_body, request=REQUEST
+ 200,
+ headers=headers,
+ content=compressed_body,
)
assert response.content == body
headers = [(b"Content-Encoding", b"deflate")]
response = httpx.Response(
- 200, headers=headers, content=compressed_body, request=REQUEST
+ 200,
+ headers=headers,
+ content=compressed_body,
)
assert response.content == body
headers = [(b"Content-Encoding", b"gzip")]
response = httpx.Response(
- 200, headers=headers, content=compressed_body, request=REQUEST
+ 200,
+ headers=headers,
+ content=compressed_body,
)
assert response.content == body
headers = [(b"Content-Encoding", b"br")]
response = httpx.Response(
- 200, headers=headers, content=compressed_body, request=REQUEST
+ 200,
+ headers=headers,
+ content=compressed_body,
)
assert response.content == body
headers = [(b"Content-Encoding", b"deflate, gzip")]
response = httpx.Response(
- 200, headers=headers, content=compressed_body, request=REQUEST
+ 200,
+ headers=headers,
+ content=compressed_body,
)
assert response.content == body
headers = [(b"Content-Encoding", b"br, identity")]
response = httpx.Response(
- 200, headers=headers, content=compressed_body, request=REQUEST
+ 200,
+ headers=headers,
+ content=compressed_body,
)
assert response.content == body
headers = [(b"Content-Encoding", b"identity, br")]
response = httpx.Response(
- 200, headers=headers, content=compressed_body, request=REQUEST
+ 200,
+ headers=headers,
+ content=compressed_body,
)
assert response.content == body
headers = [(b"Content-Encoding", b"gzip")]
stream = AsyncIteratorStream(aiterator=compress(body))
- response = httpx.Response(200, headers=headers, stream=stream, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ headers=headers,
+ stream=stream,
+ )
assert not hasattr(response, "body")
assert await response.aread() == body
@pytest.mark.parametrize("header_value", (b"deflate", b"gzip", b"br", b"identity"))
def test_empty_content(header_value):
headers = [(b"Content-Encoding", header_value)]
- response = httpx.Response(200, headers=headers, content=b"", request=REQUEST)
+ response = httpx.Response(
+ 200,
+ headers=headers,
+ content=b"",
+ )
assert response.content == b""
body = b"test 123"
compressed_body = brotli.compress(body)[3:]
with pytest.raises(httpx.DecodingError):
- httpx.Response(200, headers=headers, content=compressed_body, request=REQUEST)
+ request = httpx.Request("GET", "https://example.org")
+ httpx.Response(200, headers=headers, content=compressed_body, request=request)
+
+ with pytest.raises(ValueError):
+ httpx.Response(200, headers=headers, content=compressed_body)
@pytest.mark.parametrize(
# Accessing `.text` on a read response.
stream = AsyncIteratorStream(aiterator=iterator())
- response = httpx.Response(200, stream=stream, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ stream=stream,
+ )
await response.aread()
assert response.text == (b"".join(data)).decode(encoding)
# Streaming `.aiter_text` iteratively.
stream = AsyncIteratorStream(aiterator=iterator())
- response = httpx.Response(200, stream=stream, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ stream=stream,
+ )
text = "".join([part async for part in response.aiter_text()])
assert text == (b"".join(data)).decode(encoding)
200,
headers=[(b"Content-Type", b"text/html; charset=shift-jis")],
stream=stream,
- request=REQUEST,
)
await response.aread()
headers = [(b"Content-Encoding", b"invalid-header")]
body = b"test 123"
- response = httpx.Response(200, headers=headers, content=body, request=REQUEST)
+ response = httpx.Response(
+ 200,
+ headers=headers,
+ content=body,
+ )
assert response.content == body