import httpx
from httpx.content_streams import AsyncIteratorStream
+REQUEST = httpx.Request("GET", "https://example.org")
+
def streaming_body():
yield b"Hello, "
def test_response():
- response = httpx.Response(200, content=b"Hello, world!")
+ response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
assert response.status_code == 200
assert response.reason_phrase == "OK"
assert response.text == "Hello, world!"
+ assert response.request is REQUEST
assert response.elapsed == datetime.timedelta(0)
assert not response.is_error
def test_response_repr():
- response = httpx.Response(200, content=b"Hello, world!")
+ response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
assert repr(response) == "<Response [200 OK]>"
"""
headers = {"Content-Type": "text-plain; charset=latin-1"}
content = "Latin 1: ÿ".encode("latin-1")
- response = httpx.Response(200, content=content, headers=headers)
+ response = httpx.Response(200, content=content, headers=headers, request=REQUEST)
assert response.text == "Latin 1: ÿ"
assert response.encoding == "latin-1"
Autodetect encoding if there is no charset info in a Content-Type header.
"""
content = "おはようございます。".encode("EUC-JP")
- response = httpx.Response(200, content=content)
+ response = httpx.Response(200, content=content, request=REQUEST)
assert response.text == "おはようございます。"
assert response.encoding == "EUC-JP"
"""
headers = {"Content-Type": "text-plain; charset=invalid-codec-name"}
content = "おはようございます。".encode("EUC-JP")
- response = httpx.Response(200, content=content, headers=headers)
+ response = httpx.Response(200, content=content, headers=headers, request=REQUEST)
assert response.text == "おはようございます。"
assert response.encoding == "EUC-JP"
"""
content = b"Hello, world!"
headers = {"Content-Type": "text/plain"}
- response = httpx.Response(200, content=content, headers=headers)
+ response = httpx.Response(200, content=content, headers=headers, request=REQUEST)
assert response.status_code == 200
assert response.encoding == "iso-8859-1"
assert response.text == "Hello, world!"
"""
Default to utf-8 if all else fails.
"""
- response = httpx.Response(200, content=b"")
+ response = httpx.Response(200, content=b"", request=REQUEST)
assert response.text == ""
assert response.encoding == "utf-8"
Default to apparent encoding for non-text content-type headers.
"""
headers = {"Content-Type": "image/png"}
- response = httpx.Response(200, content=b"xyz", headers=headers)
+ response = httpx.Response(200, content=b"xyz", headers=headers, request=REQUEST)
assert response.text == "xyz"
assert response.encoding == "ascii"
"Content-Type": "text-plain; charset=utf-8"
} # Deliberately incorrect charset
response = httpx.Response(
- 200, content="Latin 1: ÿ".encode("latin-1"), headers=headers
+ 200, content="Latin 1: ÿ".encode("latin-1"), headers=headers, request=REQUEST,
)
response.encoding = "latin-1"
assert response.text == "Latin 1: ÿ"
def test_response_force_encoding():
- response = httpx.Response(200, content="Snowman: ☃".encode("utf-8"))
+ response = httpx.Response(
+ 200, content="Snowman: ☃".encode("utf-8"), request=REQUEST
+ )
response.encoding = "iso-8859-1"
assert response.status_code == 200
assert response.reason_phrase == "OK"
@pytest.mark.asyncio
async def test_read_response():
- response = httpx.Response(200, content=b"Hello, world!")
+ response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
assert response.status_code == 200
assert response.text == "Hello, world!"
@pytest.mark.asyncio
async def test_raw_interface():
- response = httpx.Response(200, content=b"Hello, world!")
+ response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
raw = b""
async for part in response.aiter_raw():
@pytest.mark.asyncio
async def test_bytes_interface():
- response = httpx.Response(200, content=b"Hello, world!")
+ response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
content = b""
async for part in response.aiter_bytes():
@pytest.mark.asyncio
async def test_text_interface():
- response = httpx.Response(200, content=b"Hello, world!")
+ response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
await response.read()
@pytest.mark.asyncio
async def test_lines_interface():
- response = httpx.Response(200, content=b"Hello,\nworld!")
+ response = httpx.Response(200, content=b"Hello,\nworld!", request=REQUEST)
await response.read()
@pytest.mark.asyncio
async def test_stream_interface_after_read():
- response = httpx.Response(200, content=b"Hello, world!")
+ response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
await response.read()
@pytest.mark.asyncio
async def test_streaming_response():
stream = AsyncIteratorStream(aiterator=async_streaming_body())
- response = httpx.Response(200, stream=stream)
+ response = httpx.Response(200, stream=stream, request=REQUEST)
assert response.status_code == 200
assert not response.is_closed
@pytest.mark.asyncio
async def test_cannot_read_after_stream_consumed():
stream = AsyncIteratorStream(aiterator=async_streaming_body())
- response = httpx.Response(200, stream=stream)
+ response = httpx.Response(200, stream=stream, request=REQUEST)
content = b""
async for part in response.aiter_bytes():
@pytest.mark.asyncio
async def test_cannot_read_after_response_closed():
stream = AsyncIteratorStream(aiterator=async_streaming_body())
- response = httpx.Response(200, stream=stream)
+ response = httpx.Response(200, stream=stream, request=REQUEST)
await response.close()
def test_unknown_status_code():
- response = httpx.Response(600)
+ response = httpx.Response(600, request=REQUEST)
assert response.status_code == 600
assert response.reason_phrase == ""
assert response.text == ""
data = {"greeting": "hello", "recipient": "world"}
content = json.dumps(data).encode("utf-16")
headers = {"Content-Type": "application/json, charset=utf-16"}
- response = httpx.Response(200, content=content, headers=headers)
+ response = httpx.Response(200, content=content, headers=headers, request=REQUEST)
assert response.json() == data
data = {"greeting": "hello", "recipient": "world", "amount": 1}
content = json.dumps(data).encode("utf-16")
headers = {"Content-Type": "application/json, charset=utf-16"}
- response = httpx.Response(200, content=content, headers=headers)
+ response = httpx.Response(200, content=content, headers=headers, request=REQUEST)
assert response.json(parse_int=str)["amount"] == "1"
data = {"greeting": "hello", "recipient": "world"}
content = json.dumps(data).encode("utf-32-be")
headers = {"Content-Type": "application/json"}
- response = httpx.Response(200, content=content, headers=headers)
+ response = httpx.Response(200, content=content, headers=headers, request=REQUEST)
assert response.json() == data
headers = {"Content-Type": "application/json"}
# force incorrect guess from `guess_json_utf` to trigger error
with mock.patch("httpx.models.guess_json_utf", return_value="utf-32"):
- response = httpx.Response(200, content=content, headers=headers)
+ response = httpx.Response(
+ 200, content=content, headers=headers, request=REQUEST
+ )
with pytest.raises(json.JSONDecodeError):
response.json()
],
)
def test_link_headers(headers, expected):
- response = httpx.Response(200, content=None, headers=headers)
+ response = httpx.Response(200, content=None, headers=headers, request=REQUEST)
assert response.links == expected
TextDecoder,
)
+REQUEST = httpx.Request("GET", "https://example.org")
+
def test_deflate():
body = b"test 123"
compressed_body = compressor.compress(body) + compressor.flush()
headers = [(b"Content-Encoding", b"deflate")]
- response = httpx.Response(200, headers=headers, content=compressed_body)
+ response = httpx.Response(
+ 200, headers=headers, content=compressed_body, request=REQUEST
+ )
assert response.content == body
compressed_body = compressor.compress(body) + compressor.flush()
headers = [(b"Content-Encoding", b"gzip")]
- response = httpx.Response(200, headers=headers, content=compressed_body)
+ response = httpx.Response(
+ 200, headers=headers, content=compressed_body, request=REQUEST
+ )
assert response.content == body
compressed_body = brotli.compress(body)
headers = [(b"Content-Encoding", b"br")]
- response = httpx.Response(200, headers=headers, content=compressed_body)
+ response = httpx.Response(
+ 200, headers=headers, content=compressed_body, request=REQUEST
+ )
assert response.content == body
)
headers = [(b"Content-Encoding", b"deflate, gzip")]
- response = httpx.Response(200, headers=headers, content=compressed_body)
+ response = httpx.Response(
+ 200, headers=headers, content=compressed_body, request=REQUEST
+ )
assert response.content == body
compressed_body = brotli.compress(body)
headers = [(b"Content-Encoding", b"br, identity")]
- response = httpx.Response(200, headers=headers, content=compressed_body)
+ response = httpx.Response(
+ 200, headers=headers, content=compressed_body, request=REQUEST
+ )
assert response.content == body
headers = [(b"Content-Encoding", b"identity, br")]
- response = httpx.Response(200, headers=headers, content=compressed_body)
+ response = httpx.Response(
+ 200, headers=headers, content=compressed_body, request=REQUEST
+ )
assert response.content == body
headers = [(b"Content-Encoding", b"gzip")]
stream = AsyncIteratorStream(aiterator=compress(body))
- response = httpx.Response(200, headers=headers, stream=stream)
+ response = httpx.Response(200, headers=headers, stream=stream, request=REQUEST)
assert not hasattr(response, "body")
assert await response.read() == body
@pytest.mark.parametrize("header_value", (b"deflate", b"gzip", b"br", b"identity"))
def test_empty_content(header_value):
headers = [(b"Content-Encoding", header_value)]
- response = httpx.Response(200, headers=headers, content=b"")
+ response = httpx.Response(200, headers=headers, content=b"", request=REQUEST)
assert response.content == b""
body = b"test 123"
compressed_body = brotli.compress(body)[3:]
with pytest.raises(httpx.DecodingError):
- response = httpx.Response(200, headers=headers, content=compressed_body)
+ response = httpx.Response(
+ 200, headers=headers, content=compressed_body, request=REQUEST
+ )
response.content
yield chunk
stream = AsyncIteratorStream(aiterator=iterator())
- response = httpx.Response(200, stream=stream)
+ response = httpx.Response(200, stream=stream, request=REQUEST)
await response.read()
assert response.text == (b"".join(data)).decode(encoding)
200,
headers=[(b"Content-Type", b"text/html; charset=shift-jis")],
stream=stream,
+ request=REQUEST,
)
await response.read()
headers = [(b"Content-Encoding", b"invalid-header")]
body = b"test 123"
- response = httpx.Response(200, headers=headers, content=body)
+ response = httpx.Response(200, headers=headers, content=body, request=REQUEST)
assert response.content == body