${PREFIX}ruff format $SOURCE_FILES --diff
${PREFIX}mypy $SOURCE_FILES
${PREFIX}ruff check $SOURCE_FILES
+${PREFIX}python scripts/unasync.py --check
${PREFIX}ruff check --fix $SOURCE_FILES
${PREFIX}ruff format $SOURCE_FILES
+${PREFIX}python scripts/unasync.py
--- /dev/null
+#!venv/bin/python
+import os
+import re
+import sys
+from pprint import pprint
+
+SUBS = [
+ # httpx specific
+ ('AsyncByteStream', 'SyncByteStream'),
+ ('async_auth_flow', 'sync_auth_flow'),
+ ('handle_async_request', 'handle_request'),
+ # general
+ ('AsyncIterator', 'Iterator'),
+ ('from anyio import Lock', 'from threading import Lock'),
+ ('Async([A-Z][A-Za-z0-9_]*)', r'\2'),
+ ('async def', 'def'),
+ ('async with', 'with'),
+ ('async for', 'for'),
+ ('await ', ''),
+ ('aclose', 'close'),
+ ('aread', 'read'),
+ ('__aenter__', '__enter__'),
+ ('__aexit__', '__exit__'),
+ ('__aiter__', '__iter__'),
+ ('@pytest.mark.anyio', ''),
+]
+COMPILED_SUBS = [
+ (re.compile(r'(^|\b)' + regex + r'($|\b)'), repl)
+ for regex, repl in SUBS
+]
+
+USED_SUBS = set()
+
+def unasync_line(line):
+ for index, (regex, repl) in enumerate(COMPILED_SUBS):
+ old_line = line
+ line = re.sub(regex, repl, line)
+ if old_line != line:
+ USED_SUBS.add(index)
+ return line
+
+
+def unasync_file(in_path, out_path):
+ with open(in_path, "r") as in_file:
+ with open(out_path, "w", newline="") as out_file:
+ for line in in_file.readlines():
+ line = unasync_line(line)
+ out_file.write(line)
+
+
+def unasync_file_check(in_path, out_path):
+ with open(in_path, "r") as in_file:
+ with open(out_path, "r") as out_file:
+ for in_line, out_line in zip(in_file.readlines(), out_file.readlines()):
+ expected = unasync_line(in_line)
+ if out_line != expected:
+ print(f'unasync mismatch between {in_path!r} and {out_path!r}')
+ print(f'Async code: {in_line!r}')
+ print(f'Expected sync code: {expected!r}')
+ print(f'Actual sync code: {out_line!r}')
+ sys.exit(1)
+
+
+def unasync_dir(in_dir, out_dir, check_only=False):
+ for dirpath, dirnames, filenames in os.walk(in_dir):
+ for filename in filenames:
+ if not filename.endswith('.py'):
+ continue
+ rel_dir = os.path.relpath(dirpath, in_dir)
+ in_path = os.path.normpath(os.path.join(in_dir, rel_dir, filename))
+ out_path = os.path.normpath(os.path.join(out_dir, rel_dir, filename))
+ print(in_path, '->', out_path)
+ if check_only:
+ unasync_file_check(in_path, out_path)
+ else:
+ unasync_file(in_path, out_path)
+
+
+def main():
+ check_only = '--check' in sys.argv
+ unasync_dir("tests/client/async", "tests/client/sync", check_only=check_only)
+
+ if len(USED_SUBS) != len(SUBS):
+ unused_subs = [SUBS[i] for i in range(len(SUBS)) if i not in USED_SUBS]
+
+ print("These patterns were not used:")
+ pprint(unused_subs)
+ exit(1)
+
+
+if __name__ == '__main__':
+ main()
import typing
from urllib.request import parse_keqv_list
-import anyio
import pytest
+from anyio import Lock
import httpx
-from ..common import FIXTURES_DIR
+from ...common import FIXTURES_DIR
class App:
"""
def __init__(self) -> None:
- self._async_lock = anyio.Lock()
+ self._lock = Lock()
- async def async_auth_flow(
- self, request: httpx.Request
- ) -> typing.AsyncGenerator[httpx.Request, httpx.Response]:
- async with self._async_lock:
- request.headers["Authorization"] = "async-auth"
+ async def async_auth_flow(self, request: httpx.Request) -> typing.Any:
+ async with self._lock:
+ request.headers["Authorization"] = "auth"
yield request
response = await client.get(url, auth=auth)
assert response.status_code == 200
- assert response.json() == {"auth": "async-auth"}
+ assert response.json() == {"auth": "auth"}
assert response.status_code == 200
-@pytest.mark.anyio
-async def test_cannot_stream_sync_request(server):
- def hello_world() -> typing.Iterator[bytes]: # pragma: no cover
- yield b"Hello, "
- yield b"world!"
-
- async with httpx.AsyncClient() as client:
- with pytest.raises(RuntimeError):
- await client.post(server.url, content=hello_world())
-
-
@pytest.mark.anyio
async def test_raise_for_status(server):
async with httpx.AsyncClient() as client:
@pytest.mark.anyio
-async def test_async_mock_transport():
+async def test_mock_transport():
async def hello_world(request: httpx.Request) -> httpx.Response:
return httpx.Response(200, text="Hello, world!")
--- /dev/null
+"""
+Integration tests for authentication.
+
+Unit tests for auth classes also exist in tests/test_auth.py
+"""
+
+import hashlib
+import netrc
+import os
+import sys
+import typing
+from threading import Lock
+from urllib.request import parse_keqv_list
+
+import pytest
+
+import httpx
+
+from ...common import FIXTURES_DIR
+
+
+class App:
+ """
+ A mock app to test auth credentials.
+ """
+
+ def __init__(self, auth_header: str = "", status_code: int = 200) -> None:
+ self.auth_header = auth_header
+ self.status_code = status_code
+
+ def __call__(self, request: httpx.Request) -> httpx.Response:
+ headers = {"www-authenticate": self.auth_header} if self.auth_header else {}
+ data = {"auth": request.headers.get("Authorization")}
+ return httpx.Response(self.status_code, headers=headers, json=data)
+
+
+class DigestApp:
+ def __init__(
+ self,
+ algorithm: str = "SHA-256",
+ send_response_after_attempt: int = 1,
+ qop: str = "auth",
+ regenerate_nonce: bool = True,
+ ) -> None:
+ self.algorithm = algorithm
+ self.send_response_after_attempt = send_response_after_attempt
+ self.qop = qop
+ self._regenerate_nonce = regenerate_nonce
+ self._response_count = 0
+
+ def __call__(self, request: httpx.Request) -> httpx.Response:
+ if self._response_count < self.send_response_after_attempt:
+ return self.challenge_send(request)
+
+ data = {"auth": request.headers.get("Authorization")}
+ return httpx.Response(200, json=data)
+
+ def challenge_send(self, request: httpx.Request) -> httpx.Response:
+ self._response_count += 1
+ nonce = (
+ hashlib.sha256(os.urandom(8)).hexdigest()
+ if self._regenerate_nonce
+ else "ee96edced2a0b43e4869e96ebe27563f369c1205a049d06419bb51d8aeddf3d3"
+ )
+ challenge_data = {
+ "nonce": nonce,
+ "qop": self.qop,
+ "opaque": (
+ "ee6378f3ee14ebfd2fff54b70a91a7c9390518047f242ab2271380db0e14bda1"
+ ),
+ "algorithm": self.algorithm,
+ "stale": "FALSE",
+ }
+ challenge_str = ", ".join(
+ '{}="{}"'.format(key, value)
+ for key, value in challenge_data.items()
+ if value
+ )
+
+ headers = {
+ "www-authenticate": f'Digest realm="httpx@example.org", {challenge_str}',
+ }
+ return httpx.Response(401, headers=headers)
+
+
+class RepeatAuth(httpx.Auth):
+ """
+ A mock authentication scheme that requires clients to send
+ the request a fixed number of times, and then send a last request containing
+ an aggregation of nonces that the server sent in 'WWW-Authenticate' headers
+ of intermediate responses.
+ """
+
+ requires_request_body = True
+
+ def __init__(self, repeat: int) -> None:
+ self.repeat = repeat
+
+ def auth_flow(
+ self, request: httpx.Request
+ ) -> typing.Generator[httpx.Request, httpx.Response, None]:
+ nonces = []
+
+ for index in range(self.repeat):
+ request.headers["Authorization"] = f"Repeat {index}"
+ response = yield request
+ nonces.append(response.headers["www-authenticate"])
+
+ key = ".".join(nonces)
+ request.headers["Authorization"] = f"Repeat {key}"
+ yield request
+
+
+class ResponseBodyAuth(httpx.Auth):
+ """
+ A mock authentication scheme that requires clients to send an 'Authorization'
+ header, then send back the contents of the response in the 'Authorization'
+ header.
+ """
+
+ requires_response_body = True
+
+ def __init__(self, token: str) -> None:
+ self.token = token
+
+ def auth_flow(
+ self, request: httpx.Request
+ ) -> typing.Generator[httpx.Request, httpx.Response, None]:
+ request.headers["Authorization"] = self.token
+ response = yield request
+ data = response.text
+ request.headers["Authorization"] = data
+ yield request
+
+
+class Auth(httpx.Auth):
+ """
+ A mock authentication scheme that uses a different implementation for the
+ sync and async cases.
+ """
+
+ def __init__(self) -> None:
+ self._lock = Lock()
+
+ def sync_auth_flow(self, request: httpx.Request) -> typing.Any:
+ with self._lock:
+ request.headers["Authorization"] = "auth"
+ yield request
+
+
+def test_basic_auth() -> None:
+ url = "https://example.org/"
+ auth = ("user", "password123")
+ app = App()
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url, auth=auth)
+
+ assert response.status_code == 200
+ assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
+
+
+def test_basic_auth_with_stream() -> None:
+ """
+ See: https://github.com/encode/httpx/pull/1312
+ """
+ url = "https://example.org/"
+ auth = ("user", "password123")
+ app = App()
+
+ with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
+ with client.stream("GET", url) as response:
+ response.read()
+
+ assert response.status_code == 200
+ assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
+
+
+def test_basic_auth_in_url() -> None:
+ url = "https://user:password123@example.org/"
+ app = App()
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url)
+
+ assert response.status_code == 200
+ assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
+
+
+def test_basic_auth_on_session() -> None:
+ url = "https://example.org/"
+ auth = ("user", "password123")
+ app = App()
+
+ with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
+ response = client.get(url)
+
+ assert response.status_code == 200
+ assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
+
+
+def test_custom_auth() -> None:
+ url = "https://example.org/"
+ app = App()
+
+ def auth(request: httpx.Request) -> httpx.Request:
+ request.headers["Authorization"] = "Token 123"
+ return request
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url, auth=auth)
+
+ assert response.status_code == 200
+ assert response.json() == {"auth": "Token 123"}
+
+
+def test_netrc_auth_credentials_exist() -> None:
+ """
+ When netrc auth is being used and a request is made to a host that is
+ in the netrc file, then the relevant credentials should be applied.
+ """
+ netrc_file = str(FIXTURES_DIR / ".netrc")
+ url = "http://netrcexample.org"
+ app = App()
+ auth = httpx.NetRCAuth(netrc_file)
+
+ with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
+ response = client.get(url)
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "auth": "Basic ZXhhbXBsZS11c2VybmFtZTpleGFtcGxlLXBhc3N3b3Jk"
+ }
+
+
+def test_netrc_auth_credentials_do_not_exist() -> None:
+ """
+ When netrc auth is being used and a request is made to a host that is
+ not in the netrc file, then no credentials should be applied.
+ """
+ netrc_file = str(FIXTURES_DIR / ".netrc")
+ url = "http://example.org"
+ app = App()
+ auth = httpx.NetRCAuth(netrc_file)
+
+ with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
+ response = client.get(url)
+
+ assert response.status_code == 200
+ assert response.json() == {"auth": None}
+
+
+@pytest.mark.skipif(
+ sys.version_info >= (3, 11),
+ reason="netrc files without a password are valid from Python >= 3.11",
+)
+def test_netrc_auth_nopassword_parse_error() -> None: # pragma: no cover
+ """
+ Python has different netrc parsing behaviours with different versions.
+ For Python < 3.11 a netrc file with no password is invalid. In this case
+ we want to allow the parse error to be raised.
+ """
+ netrc_file = str(FIXTURES_DIR / ".netrc-nopassword")
+ with pytest.raises(netrc.NetrcParseError):
+ httpx.NetRCAuth(netrc_file)
+
+
+def test_auth_disable_per_request() -> None:
+ url = "https://example.org/"
+ auth = ("user", "password123")
+ app = App()
+
+ with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client:
+ response = client.get(url, auth=None)
+
+ assert response.status_code == 200
+ assert response.json() == {"auth": None}
+
+
+def test_auth_hidden_url() -> None:
+ url = "http://example-username:example-password@example.org/"
+ expected = "URL('http://example-username:[secure]@example.org/')"
+ assert url == httpx.URL(url)
+ assert expected == repr(httpx.URL(url))
+
+
+def test_auth_hidden_header() -> None:
+ url = "https://example.org/"
+ auth = ("example-username", "example-password")
+ app = App()
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url, auth=auth)
+
+ assert "'authorization': '[secure]'" in str(response.request.headers)
+
+
+def test_auth_property() -> None:
+ app = App()
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ assert client.auth is None
+
+ client.auth = ("user", "password123") # type: ignore
+ assert isinstance(client.auth, httpx.BasicAuth)
+
+ url = "https://example.org/"
+ response = client.get(url)
+ assert response.status_code == 200
+ assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="}
+
+
+def test_auth_invalid_type() -> None:
+ app = App()
+
+ with pytest.raises(TypeError):
+ client = httpx.Client(
+ transport=httpx.MockTransport(app),
+ auth="not a tuple, not a callable", # type: ignore
+ )
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ with pytest.raises(TypeError):
+ client.get(auth="not a tuple, not a callable") # type: ignore
+
+ with pytest.raises(TypeError):
+ client.auth = "not a tuple, not a callable" # type: ignore
+
+
+def test_digest_auth_returns_no_auth_if_no_digest_header_in_response() -> None:
+ url = "https://example.org/"
+ auth = httpx.DigestAuth(username="user", password="password123")
+ app = App()
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url, auth=auth)
+
+ assert response.status_code == 200
+ assert response.json() == {"auth": None}
+ assert len(response.history) == 0
+
+
+def test_digest_auth_returns_no_auth_if_alternate_auth_scheme() -> None:
+ url = "https://example.org/"
+ auth = httpx.DigestAuth(username="user", password="password123")
+ auth_header = "Token ..."
+ app = App(auth_header=auth_header, status_code=401)
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url, auth=auth)
+
+ assert response.status_code == 401
+ assert response.json() == {"auth": None}
+ assert len(response.history) == 0
+
+
+def test_digest_auth_200_response_including_digest_auth_header() -> None:
+ url = "https://example.org/"
+ auth = httpx.DigestAuth(username="user", password="password123")
+ auth_header = 'Digest realm="realm@host.com",qop="auth",nonce="abc",opaque="xyz"'
+ app = App(auth_header=auth_header, status_code=200)
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url, auth=auth)
+
+ assert response.status_code == 200
+ assert response.json() == {"auth": None}
+ assert len(response.history) == 0
+
+
+def test_digest_auth_401_response_without_digest_auth_header() -> None:
+ url = "https://example.org/"
+ auth = httpx.DigestAuth(username="user", password="password123")
+ app = App(auth_header="", status_code=401)
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url, auth=auth)
+
+ assert response.status_code == 401
+ assert response.json() == {"auth": None}
+ assert len(response.history) == 0
+
+
+@pytest.mark.parametrize(
+ "algorithm,expected_hash_length,expected_response_length",
+ [
+ ("MD5", 64, 32),
+ ("MD5-SESS", 64, 32),
+ ("SHA", 64, 40),
+ ("SHA-SESS", 64, 40),
+ ("SHA-256", 64, 64),
+ ("SHA-256-SESS", 64, 64),
+ ("SHA-512", 64, 128),
+ ("SHA-512-SESS", 64, 128),
+ ],
+)
+def test_digest_auth(
+ algorithm: str, expected_hash_length: int, expected_response_length: int
+) -> None:
+ url = "https://example.org/"
+ auth = httpx.DigestAuth(username="user", password="password123")
+ app = DigestApp(algorithm=algorithm)
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url, auth=auth)
+
+ assert response.status_code == 200
+ assert len(response.history) == 1
+
+ authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"]
+ scheme, _, fields = authorization.partition(" ")
+ assert scheme == "Digest"
+
+ response_fields = [field.strip() for field in fields.split(",")]
+ digest_data = dict(field.split("=") for field in response_fields)
+
+ assert digest_data["username"] == '"user"'
+ assert digest_data["realm"] == '"httpx@example.org"'
+ assert "nonce" in digest_data
+ assert digest_data["uri"] == '"/"'
+ assert len(digest_data["response"]) == expected_response_length + 2 # extra quotes
+ assert len(digest_data["opaque"]) == expected_hash_length + 2
+ assert digest_data["algorithm"] == algorithm
+ assert digest_data["qop"] == "auth"
+ assert digest_data["nc"] == "00000001"
+ assert len(digest_data["cnonce"]) == 16 + 2
+
+
+def test_digest_auth_no_specified_qop() -> None:
+ url = "https://example.org/"
+ auth = httpx.DigestAuth(username="user", password="password123")
+ app = DigestApp(qop="")
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url, auth=auth)
+
+ assert response.status_code == 200
+ assert len(response.history) == 1
+
+ authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"]
+ scheme, _, fields = authorization.partition(" ")
+ assert scheme == "Digest"
+
+ response_fields = [field.strip() for field in fields.split(",")]
+ digest_data = dict(field.split("=") for field in response_fields)
+
+ assert "qop" not in digest_data
+ assert "nc" not in digest_data
+ assert "cnonce" not in digest_data
+ assert digest_data["username"] == '"user"'
+ assert digest_data["realm"] == '"httpx@example.org"'
+ assert len(digest_data["nonce"]) == 64 + 2 # extra quotes
+ assert digest_data["uri"] == '"/"'
+ assert len(digest_data["response"]) == 64 + 2
+ assert len(digest_data["opaque"]) == 64 + 2
+ assert digest_data["algorithm"] == "SHA-256"
+
+
+@pytest.mark.parametrize("qop", ("auth, auth-int", "auth,auth-int", "unknown,auth"))
+def test_digest_auth_qop_including_spaces_and_auth_returns_auth(qop: str) -> None:
+ url = "https://example.org/"
+ auth = httpx.DigestAuth(username="user", password="password123")
+ app = DigestApp(qop=qop)
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url, auth=auth)
+
+ assert response.status_code == 200
+ assert len(response.history) == 1
+
+
+def test_digest_auth_qop_auth_int_not_implemented() -> None:
+ url = "https://example.org/"
+ auth = httpx.DigestAuth(username="user", password="password123")
+ app = DigestApp(qop="auth-int")
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ with pytest.raises(NotImplementedError):
+ client.get(url, auth=auth)
+
+
+def test_digest_auth_qop_must_be_auth_or_auth_int() -> None:
+ url = "https://example.org/"
+ auth = httpx.DigestAuth(username="user", password="password123")
+ app = DigestApp(qop="not-auth")
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ with pytest.raises(httpx.ProtocolError):
+ client.get(url, auth=auth)
+
+
+def test_digest_auth_incorrect_credentials() -> None:
+ url = "https://example.org/"
+ auth = httpx.DigestAuth(username="user", password="password123")
+ app = DigestApp(send_response_after_attempt=2)
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url, auth=auth)
+
+ assert response.status_code == 401
+ assert len(response.history) == 1
+
+
+def test_digest_auth_reuses_challenge() -> None:
+ url = "https://example.org/"
+ auth = httpx.DigestAuth(username="user", password="password123")
+ app = DigestApp()
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response_1 = client.get(url, auth=auth)
+ response_2 = client.get(url, auth=auth)
+
+ assert response_1.status_code == 200
+ assert response_2.status_code == 200
+
+ assert len(response_1.history) == 1
+ assert len(response_2.history) == 0
+
+
+def test_digest_auth_resets_nonce_count_after_401() -> None:
+ url = "https://example.org/"
+ auth = httpx.DigestAuth(username="user", password="password123")
+ app = DigestApp()
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response_1 = client.get(url, auth=auth)
+ assert response_1.status_code == 200
+ assert len(response_1.history) == 1
+
+ first_nonce = parse_keqv_list(
+ response_1.request.headers["Authorization"].split(", ")
+ )["nonce"]
+ first_nc = parse_keqv_list(
+ response_1.request.headers["Authorization"].split(", ")
+ )["nc"]
+
+ # with this we now force a 401 on a subsequent (but initial) request
+ app.send_response_after_attempt = 2
+
+ # we expect the client again to try to authenticate,
+ # i.e. the history length must be 1
+ response_2 = client.get(url, auth=auth)
+ assert response_2.status_code == 200
+ assert len(response_2.history) == 1
+
+ second_nonce = parse_keqv_list(
+ response_2.request.headers["Authorization"].split(", ")
+ )["nonce"]
+ second_nc = parse_keqv_list(
+ response_2.request.headers["Authorization"].split(", ")
+ )["nc"]
+
+ assert first_nonce != second_nonce # ensures that the auth challenge was reset
+ assert (
+ first_nc == second_nc
+ ) # ensures the nonce count is reset when the authentication failed
+
+
+@pytest.mark.parametrize(
+ "auth_header",
+ [
+ 'Digest realm="httpx@example.org", qop="auth"', # missing fields
+ 'Digest realm="httpx@example.org", qop="auth,au', # malformed fields list
+ ],
+)
+def test_digest_auth_raises_protocol_error_on_malformed_header(
+ auth_header: str,
+) -> None:
+ url = "https://example.org/"
+ auth = httpx.DigestAuth(username="user", password="password123")
+ app = App(auth_header=auth_header, status_code=401)
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ with pytest.raises(httpx.ProtocolError):
+ client.get(url, auth=auth)
+
+
+def test_auth_history() -> None:
+ """
+ Test that intermediate requests sent as part of an authentication flow
+ are recorded in the response history.
+ """
+ url = "https://example.org/"
+ auth = RepeatAuth(repeat=2)
+ app = App(auth_header="abc")
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url, auth=auth)
+
+ assert response.status_code == 200
+ assert response.json() == {"auth": "Repeat abc.abc"}
+
+ assert len(response.history) == 2
+ resp1, resp2 = response.history
+ assert resp1.json() == {"auth": "Repeat 0"}
+ assert resp2.json() == {"auth": "Repeat 1"}
+
+ assert len(resp2.history) == 1
+ assert resp2.history == [resp1]
+
+ assert len(resp1.history) == 0
+
+
+class ConsumeBodyTransport(httpx.MockTransport):
+ def handle_request(self, request: httpx.Request) -> httpx.Response:
+ assert isinstance(request.stream, httpx.SyncByteStream)
+ for _ in request.stream:
+ pass
+ return self.handler(request) # type: ignore[return-value]
+
+
+def test_digest_auth_unavailable_streaming_body():
+ url = "https://example.org/"
+ auth = httpx.DigestAuth(username="user", password="password123")
+ app = DigestApp()
+
+ def streaming_body() -> typing.Iterator[bytes]:
+ yield b"Example request body" # pragma: no cover
+
+ with httpx.Client(transport=ConsumeBodyTransport(app)) as client:
+ with pytest.raises(httpx.StreamConsumed):
+ client.post(url, content=streaming_body(), auth=auth)
+
+
+def test_auth_reads_response_body() -> None:
+ """
+ Test that we can read the response body in an auth flow if `requires_response_body`
+ is set.
+ """
+ url = "https://example.org/"
+ auth = ResponseBodyAuth("xyz")
+ app = App()
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url, auth=auth)
+
+ assert response.status_code == 200
+ assert response.json() == {"auth": '{"auth":"xyz"}'}
+
+
+def test_auth() -> None:
+ """
+ Test that we can use an auth implementation specific to the async case, to
+ support cases that require performing I/O or using concurrency primitives (such
+ as checking a disk-based cache or fetching a token from a remote auth server).
+ """
+ url = "https://example.org/"
+ auth = Auth()
+ app = App()
+
+ with httpx.Client(transport=httpx.MockTransport(app)) as client:
+ response = client.get(url, auth=auth)
+
+ assert response.status_code == 200
+ assert response.json() == {"auth": "auth"}
--- /dev/null
+from __future__ import annotations
+
+import typing
+from datetime import timedelta
+
+import pytest
+
+import httpx
+
+
+def test_get(server):
+ url = server.url
+ with httpx.Client(http2=True) as client:
+ response = client.get(url)
+ assert response.status_code == 200
+ assert response.text == "Hello, world!"
+ assert response.http_version == "HTTP/1.1"
+ assert response.headers
+ assert repr(response) == "<Response [200 OK]>"
+ assert response.elapsed > timedelta(seconds=0)
+
+
+@pytest.mark.parametrize(
+ "url",
+ [
+ pytest.param("invalid://example.org", id="scheme-not-http(s)"),
+ pytest.param("://example.org", id="no-scheme"),
+ pytest.param("http://", id="no-host"),
+ ],
+)
+def test_get_invalid_url(server, url):
+ with httpx.Client() as client:
+ with pytest.raises((httpx.UnsupportedProtocol, httpx.LocalProtocolError)):
+ client.get(url)
+
+
+def test_build_request(server):
+ url = server.url.copy_with(path="/echo_headers")
+ headers = {"Custom-header": "value"}
+ with httpx.Client() as client:
+ request = client.build_request("GET", url)
+ request.headers.update(headers)
+ response = client.send(request)
+
+ assert response.status_code == 200
+ assert response.url == url
+
+ assert response.json()["Custom-header"] == "value"
+
+
+def test_post(server):
+ url = server.url
+ with httpx.Client() as client:
+ response = client.post(url, content=b"Hello, world!")
+ assert response.status_code == 200
+
+
+def test_post_json(server):
+ url = server.url
+ with httpx.Client() as client:
+ response = client.post(url, json={"text": "Hello, world!"})
+ assert response.status_code == 200
+
+
+def test_stream_response(server):
+ with httpx.Client() as client:
+ with client.stream("GET", server.url) as response:
+ body = response.read()
+
+ assert response.status_code == 200
+ assert body == b"Hello, world!"
+ assert response.content == b"Hello, world!"
+
+
+def test_access_content_stream_response(server):
+ with httpx.Client() as client:
+ with client.stream("GET", server.url) as response:
+ pass
+
+ assert response.status_code == 200
+ with pytest.raises(httpx.ResponseNotRead):
+ response.content # noqa: B018
+
+
+def test_stream_request(server):
+ def hello_world() -> typing.Iterator[bytes]:
+ yield b"Hello, "
+ yield b"world!"
+
+ with httpx.Client() as client:
+ response = client.post(server.url, content=hello_world())
+ assert response.status_code == 200
+
+
+def test_raise_for_status(server):
+ with httpx.Client() as client:
+ for status_code in (200, 400, 404, 500, 505):
+ response = client.request(
+ "GET", server.url.copy_with(path=f"/status/{status_code}")
+ )
+
+ if 400 <= status_code < 600:
+ with pytest.raises(httpx.HTTPStatusError) as exc_info:
+ response.raise_for_status()
+ assert exc_info.value.response == response
+ else:
+ assert response.raise_for_status() is response
+
+
+def test_options(server):
+ with httpx.Client() as client:
+ response = client.options(server.url)
+ assert response.status_code == 200
+ assert response.text == "Hello, world!"
+
+
+def test_head(server):
+ with httpx.Client() as client:
+ response = client.head(server.url)
+ assert response.status_code == 200
+ assert response.text == ""
+
+
+def test_put(server):
+ with httpx.Client() as client:
+ response = client.put(server.url, content=b"Hello, world!")
+ assert response.status_code == 200
+
+
+def test_patch(server):
+ with httpx.Client() as client:
+ response = client.patch(server.url, content=b"Hello, world!")
+ assert response.status_code == 200
+
+
+def test_delete(server):
+ with httpx.Client() as client:
+ response = client.delete(server.url)
+ assert response.status_code == 200
+ assert response.text == "Hello, world!"
+
+
+def test_100_continue(server):
+ headers = {"Expect": "100-continue"}
+ content = b"Echo request body"
+
+ with httpx.Client() as client:
+ response = client.post(
+ server.url.copy_with(path="/echo_body"), headers=headers, content=content
+ )
+
+ assert response.status_code == 200
+ assert response.content == content
+
+
+def test_context_managed_transport():
+ class Transport(httpx.BaseTransport):
+ def __init__(self) -> None:
+ self.events: list[str] = []
+
+ def close(self):
+ # The base implementation of httpx.BaseTransport just
+ # calls into `.close`, so simple transport cases can just override
+ # this method for any cleanup, where more complex cases
+ # might want to additionally override `__enter__`/`__exit__`.
+ self.events.append("transport.close")
+
+ def __enter__(self):
+ super().__enter__()
+ self.events.append("transport.__enter__")
+
+ def __exit__(self, *args):
+ super().__exit__(*args)
+ self.events.append("transport.__exit__")
+
+ transport = Transport()
+ with httpx.Client(transport=transport):
+ pass
+
+ assert transport.events == [
+ "transport.__enter__",
+ "transport.close",
+ "transport.__exit__",
+ ]
+
+
+def test_context_managed_transport_and_mount():
+ class Transport(httpx.BaseTransport):
+ def __init__(self, name: str) -> None:
+ self.name: str = name
+ self.events: list[str] = []
+
+ def close(self):
+ # The base implementation of httpx.BaseTransport just
+ # calls into `.close`, so simple transport cases can just override
+ # this method for any cleanup, where more complex cases
+ # might want to additionally override `__enter__`/`__exit__`.
+ self.events.append(f"{self.name}.close")
+
+ def __enter__(self):
+ super().__enter__()
+ self.events.append(f"{self.name}.__enter__")
+
+ def __exit__(self, *args):
+ super().__exit__(*args)
+ self.events.append(f"{self.name}.__exit__")
+
+ transport = Transport(name="transport")
+ mounted = Transport(name="mounted")
+ with httpx.Client(transport=transport, mounts={"http://www.example.org": mounted}):
+ pass
+
+ assert transport.events == [
+ "transport.__enter__",
+ "transport.close",
+ "transport.__exit__",
+ ]
+ assert mounted.events == [
+ "mounted.__enter__",
+ "mounted.close",
+ "mounted.__exit__",
+ ]
+
+
+def hello_world(request):
+ return httpx.Response(200, text="Hello, world!")
+
+
+def test_client_closed_state_using_implicit_open():
+ client = httpx.Client(transport=httpx.MockTransport(hello_world))
+
+ assert not client.is_closed
+ client.get("http://example.com")
+
+ assert not client.is_closed
+ client.close()
+
+ assert client.is_closed
+ # Once we're close we cannot make any more requests.
+ with pytest.raises(RuntimeError):
+ client.get("http://example.com")
+
+ # Once we're closed we cannot reopen the client.
+ with pytest.raises(RuntimeError):
+ with client:
+ pass # pragma: no cover
+
+
+def test_client_closed_state_using_with_block():
+ with httpx.Client(transport=httpx.MockTransport(hello_world)) as client:
+ assert not client.is_closed
+ client.get("http://example.com")
+
+ assert client.is_closed
+ with pytest.raises(RuntimeError):
+ client.get("http://example.com")
+
+
+def unmounted(request: httpx.Request) -> httpx.Response:
+ data = {"app": "unmounted"}
+ return httpx.Response(200, json=data)
+
+
+def mounted(request: httpx.Request) -> httpx.Response:
+ data = {"app": "mounted"}
+ return httpx.Response(200, json=data)
+
+
+def test_mounted_transport():
+ transport = httpx.MockTransport(unmounted)
+ mounts = {"custom://": httpx.MockTransport(mounted)}
+
+ with httpx.Client(transport=transport, mounts=mounts) as client:
+ response = client.get("https://www.example.com")
+ assert response.status_code == 200
+ assert response.json() == {"app": "unmounted"}
+
+ response = client.get("custom://www.example.com")
+ assert response.status_code == 200
+ assert response.json() == {"app": "mounted"}
+
+
+def test_mock_transport():
+ def hello_world(request: httpx.Request) -> httpx.Response:
+ return httpx.Response(200, text="Hello, world!")
+
+ transport = httpx.MockTransport(hello_world)
+
+ with httpx.Client(transport=transport) as client:
+ response = client.get("https://www.example.com")
+ assert response.status_code == 200
+ assert response.text == "Hello, world!"
+
+
+def test_cancellation_during_stream():
+ """
+ If any BaseException is raised during streaming the response, then the
+ stream should be closed.
+
+ This includes:
+
+ * `asyncio.CancelledError` (A subclass of BaseException from Python 3.8 onwards.)
+ * `trio.Cancelled`
+ * `KeyboardInterrupt`
+ * `SystemExit`
+
+ See https://github.com/encode/httpx/issues/2139
+ """
+ stream_was_closed = False
+
+ def response_with_cancel_during_stream(request):
+ class CancelledStream(httpx.SyncByteStream):
+ def __iter__(self) -> typing.Iterator[bytes]:
+ yield b"Hello"
+ raise KeyboardInterrupt()
+ yield b", world" # pragma: no cover
+
+ def close(self) -> None:
+ nonlocal stream_was_closed
+ stream_was_closed = True
+
+ return httpx.Response(
+ 200, headers={"Content-Length": "12"}, stream=CancelledStream()
+ )
+
+ transport = httpx.MockTransport(response_with_cancel_during_stream)
+
+ with httpx.Client(transport=transport) as client:
+ with pytest.raises(KeyboardInterrupt):
+ client.get("https://www.example.com")
+ assert stream_was_closed
+
+
+def test_server_extensions(server):
+ url = server.url
+ with httpx.Client(http2=True) as client:
+ response = client.get(url)
+ assert response.status_code == 200
+ assert response.extensions["http_version"] == b"HTTP/1.1"
--- /dev/null
+from http.cookiejar import Cookie, CookieJar
+
+import pytest
+
+import httpx
+
+
+def get_and_set_cookies(request: httpx.Request) -> httpx.Response:
+ if request.url.path == "/echo_cookies":
+ data = {"cookies": request.headers.get("cookie")}
+ return httpx.Response(200, json=data)
+ elif request.url.path == "/set_cookie":
+ return httpx.Response(200, headers={"set-cookie": "example-name=example-value"})
+ else:
+ raise NotImplementedError() # pragma: no cover
+
+
+def test_set_cookie() -> None:
+ """
+ Send a request including a cookie.
+ """
+ url = "http://example.org/echo_cookies"
+ cookies = {"example-name": "example-value"}
+
+ with httpx.Client(
+ cookies=cookies, transport=httpx.MockTransport(get_and_set_cookies)
+ ) as client:
+ response = client.get(url)
+
+ assert response.status_code == 200
+ assert response.json() == {"cookies": "example-name=example-value"}
+
+
+def test_set_per_request_cookie_is_deprecated() -> None:
+ """
+ Sending a request including a per-request cookie is deprecated.
+ """
+ url = "http://example.org/echo_cookies"
+ cookies = {"example-name": "example-value"}
+
+ with httpx.Client(transport=httpx.MockTransport(get_and_set_cookies)) as client:
+ with pytest.warns(DeprecationWarning):
+ response = client.get(url, cookies=cookies)
+
+ assert response.status_code == 200
+ assert response.json() == {"cookies": "example-name=example-value"}
+
+
+def test_set_cookie_with_cookiejar() -> None:
+ """
+ Send a request including a cookie, using a `CookieJar` instance.
+ """
+
+ url = "http://example.org/echo_cookies"
+ cookies = CookieJar()
+ cookie = Cookie(
+ version=0,
+ name="example-name",
+ value="example-value",
+ port=None,
+ port_specified=False,
+ domain="",
+ domain_specified=False,
+ domain_initial_dot=False,
+ path="/",
+ path_specified=True,
+ secure=False,
+ expires=None,
+ discard=True,
+ comment=None,
+ comment_url=None,
+ rest={"HttpOnly": ""},
+ rfc2109=False,
+ )
+ cookies.set_cookie(cookie)
+
+ with httpx.Client(
+ cookies=cookies, transport=httpx.MockTransport(get_and_set_cookies)
+ ) as client:
+ response = client.get(url)
+
+ assert response.status_code == 200
+ assert response.json() == {"cookies": "example-name=example-value"}
+
+
+def test_setting_client_cookies_to_cookiejar() -> None:
+ """
+ Send a request including a cookie, using a `CookieJar` instance.
+ """
+
+ url = "http://example.org/echo_cookies"
+ cookies = CookieJar()
+ cookie = Cookie(
+ version=0,
+ name="example-name",
+ value="example-value",
+ port=None,
+ port_specified=False,
+ domain="",
+ domain_specified=False,
+ domain_initial_dot=False,
+ path="/",
+ path_specified=True,
+ secure=False,
+ expires=None,
+ discard=True,
+ comment=None,
+ comment_url=None,
+ rest={"HttpOnly": ""},
+ rfc2109=False,
+ )
+ cookies.set_cookie(cookie)
+
+ with httpx.Client(
+ cookies=cookies, transport=httpx.MockTransport(get_and_set_cookies)
+ ) as client:
+ response = client.get(url)
+
+ assert response.status_code == 200
+ assert response.json() == {"cookies": "example-name=example-value"}
+
+
+def test_set_cookie_with_cookies_model() -> None:
+ """
+ Send a request including a cookie, using a `Cookies` instance.
+ """
+
+ url = "http://example.org/echo_cookies"
+ cookies = httpx.Cookies()
+ cookies["example-name"] = "example-value"
+
+ with httpx.Client(transport=httpx.MockTransport(get_and_set_cookies)) as client:
+ client.cookies = cookies
+ response = client.get(url)
+
+ assert response.status_code == 200
+ assert response.json() == {"cookies": "example-name=example-value"}
+
+
+def test_get_cookie() -> None:
+ url = "http://example.org/set_cookie"
+
+ with httpx.Client(transport=httpx.MockTransport(get_and_set_cookies)) as client:
+ response = client.get(url)
+
+ assert response.status_code == 200
+ assert response.cookies["example-name"] == "example-value"
+ assert client.cookies["example-name"] == "example-value"
+
+
+def test_cookie_persistence() -> None:
+ """
+ Ensure that Client instances persist cookies between requests.
+ """
+ with httpx.Client(transport=httpx.MockTransport(get_and_set_cookies)) as client:
+ response = client.get("http://example.org/echo_cookies")
+ assert response.status_code == 200
+ assert response.json() == {"cookies": None}
+
+ response = client.get("http://example.org/set_cookie")
+ assert response.status_code == 200
+ assert response.cookies["example-name"] == "example-value"
+ assert client.cookies["example-name"] == "example-value"
+
+ response = client.get("http://example.org/echo_cookies")
+ assert response.status_code == 200
+ assert response.json() == {"cookies": "example-name=example-value"}
--- /dev/null
+import httpx
+
+
+def app(request: httpx.Request) -> httpx.Response:
+ if request.url.path == "/redirect":
+ return httpx.Response(303, headers={"server": "testserver", "location": "/"})
+ elif request.url.path.startswith("/status/"):
+ status_code = int(request.url.path[-3:])
+ return httpx.Response(status_code, headers={"server": "testserver"})
+
+ return httpx.Response(200, headers={"server": "testserver"})
+
+
+def test_event_hooks():
+ events = []
+
+ def on_request(request):
+ events.append({"event": "request", "headers": dict(request.headers)})
+
+ def on_response(response):
+ events.append({"event": "response", "headers": dict(response.headers)})
+
+ event_hooks = {"request": [on_request], "response": [on_response]}
+
+ with httpx.Client(
+ event_hooks=event_hooks, transport=httpx.MockTransport(app)
+ ) as http:
+ http.get("http://127.0.0.1:8000/", auth=("username", "password"))
+
+ assert events == [
+ {
+ "event": "request",
+ "headers": {
+ "host": "127.0.0.1:8000",
+ "user-agent": f"python-httpx/{httpx.__version__}",
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br, zstd",
+ "connection": "keep-alive",
+ "authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
+ },
+ },
+ {
+ "event": "response",
+ "headers": {"server": "testserver"},
+ },
+ ]
+
+
+def test_event_hooks_raising_exception():
+ def raise_on_4xx_5xx(response):
+ response.raise_for_status()
+
+ event_hooks = {"response": [raise_on_4xx_5xx]}
+
+ with httpx.Client(
+ event_hooks=event_hooks, transport=httpx.MockTransport(app)
+ ) as http:
+ try:
+ http.get("http://127.0.0.1:8000/status/400")
+ except httpx.HTTPStatusError as exc:
+ assert exc.response.is_closed
+
+
+def test_event_hooks_with_redirect():
+ """
+ A redirect request should trigger additional 'request' and 'response' event hooks.
+ """
+
+ events = []
+
+ def on_request(request):
+ events.append({"event": "request", "headers": dict(request.headers)})
+
+ def on_response(response):
+ events.append({"event": "response", "headers": dict(response.headers)})
+
+ event_hooks = {"request": [on_request], "response": [on_response]}
+
+ with httpx.Client(
+ event_hooks=event_hooks,
+ transport=httpx.MockTransport(app),
+ follow_redirects=True,
+ ) as http:
+ http.get("http://127.0.0.1:8000/redirect", auth=("username", "password"))
+
+ assert events == [
+ {
+ "event": "request",
+ "headers": {
+ "host": "127.0.0.1:8000",
+ "user-agent": f"python-httpx/{httpx.__version__}",
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br, zstd",
+ "connection": "keep-alive",
+ "authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
+ },
+ },
+ {
+ "event": "response",
+ "headers": {"location": "/", "server": "testserver"},
+ },
+ {
+ "event": "request",
+ "headers": {
+ "host": "127.0.0.1:8000",
+ "user-agent": f"python-httpx/{httpx.__version__}",
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br, zstd",
+ "connection": "keep-alive",
+ "authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
+ },
+ },
+ {
+ "event": "response",
+ "headers": {"server": "testserver"},
+ },
+ ]
--- /dev/null
+#!/usr/bin/env python3
+
+import pytest
+
+import httpx
+
+
+def echo_headers(request: httpx.Request) -> httpx.Response:
+ data = {"headers": dict(request.headers)}
+ return httpx.Response(200, json=data)
+
+
+def echo_repeated_headers_multi_items(request: httpx.Request) -> httpx.Response:
+ data = {"headers": list(request.headers.multi_items())}
+ return httpx.Response(200, json=data)
+
+
+def echo_repeated_headers_items(request: httpx.Request) -> httpx.Response:
+ data = {"headers": list(request.headers.items())}
+ return httpx.Response(200, json=data)
+
+
+def test_client_header():
+ """
+ Set a header in the Client.
+ """
+ url = "http://example.org/echo_headers"
+ headers = {"Example-Header": "example-value"}
+
+ with httpx.Client(
+ transport=httpx.MockTransport(echo_headers), headers=headers
+ ) as client:
+ response = client.get(url)
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "headers": {
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br, zstd",
+ "connection": "keep-alive",
+ "example-header": "example-value",
+ "host": "example.org",
+ "user-agent": f"python-httpx/{httpx.__version__}",
+ }
+ }
+
+
+def test_header_merge():
+ url = "http://example.org/echo_headers"
+ client_headers = {"User-Agent": "python-myclient/0.2.1"}
+ request_headers = {"X-Auth-Token": "FooBarBazToken"}
+ with httpx.Client(
+ transport=httpx.MockTransport(echo_headers), headers=client_headers
+ ) as client:
+ response = client.get(url, headers=request_headers)
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "headers": {
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br, zstd",
+ "connection": "keep-alive",
+ "host": "example.org",
+ "user-agent": "python-myclient/0.2.1",
+ "x-auth-token": "FooBarBazToken",
+ }
+ }
+
+
+def test_header_merge_conflicting_headers():
+ url = "http://example.org/echo_headers"
+ client_headers = {"X-Auth-Token": "FooBar"}
+ request_headers = {"X-Auth-Token": "BazToken"}
+ with httpx.Client(
+ transport=httpx.MockTransport(echo_headers), headers=client_headers
+ ) as client:
+ response = client.get(url, headers=request_headers)
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "headers": {
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br, zstd",
+ "connection": "keep-alive",
+ "host": "example.org",
+ "user-agent": f"python-httpx/{httpx.__version__}",
+ "x-auth-token": "BazToken",
+ }
+ }
+
+
+def test_header_update():
+ url = "http://example.org/echo_headers"
+ with httpx.Client(transport=httpx.MockTransport(echo_headers)) as client:
+ first_response = client.get(url)
+ client.headers.update(
+ {"User-Agent": "python-myclient/0.2.1", "Another-Header": "AThing"}
+ )
+ second_response = client.get(url)
+
+ assert first_response.status_code == 200
+ assert first_response.json() == {
+ "headers": {
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br, zstd",
+ "connection": "keep-alive",
+ "host": "example.org",
+ "user-agent": f"python-httpx/{httpx.__version__}",
+ }
+ }
+
+ assert second_response.status_code == 200
+ assert second_response.json() == {
+ "headers": {
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br, zstd",
+ "another-header": "AThing",
+ "connection": "keep-alive",
+ "host": "example.org",
+ "user-agent": "python-myclient/0.2.1",
+ }
+ }
+
+
+def test_header_repeated_items():
+ url = "http://example.org/echo_headers"
+ with httpx.Client(
+ transport=httpx.MockTransport(echo_repeated_headers_items)
+ ) as client:
+ response = client.get(url, headers=[("x-header", "1"), ("x-header", "2,3")])
+
+ assert response.status_code == 200
+
+ echoed_headers = response.json()["headers"]
+ # as per RFC 7230, the whitespace after a comma is insignificant
+ # so we split and strip here so that we can do a safe comparison
+ assert ["x-header", ["1", "2", "3"]] in [
+ [k, [subv.lstrip() for subv in v.split(",")]] for k, v in echoed_headers
+ ]
+
+
+def test_header_repeated_multi_items():
+ url = "http://example.org/echo_headers"
+ with httpx.Client(
+ transport=httpx.MockTransport(echo_repeated_headers_multi_items)
+ ) as client:
+ response = client.get(url, headers=[("x-header", "1"), ("x-header", "2,3")])
+
+ assert response.status_code == 200
+
+ echoed_headers = response.json()["headers"]
+ assert ["x-header", "1"] in echoed_headers
+ assert ["x-header", "2,3"] in echoed_headers
+
+
+def test_remove_default_header():
+ """
+ Remove a default header from the Client.
+ """
+ url = "http://example.org/echo_headers"
+
+ with httpx.Client(transport=httpx.MockTransport(echo_headers)) as client:
+ del client.headers["User-Agent"]
+
+ response = client.get(url)
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "headers": {
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br, zstd",
+ "connection": "keep-alive",
+ "host": "example.org",
+ }
+ }
+
+
+def test_header_does_not_exist():
+ headers = httpx.Headers({"foo": "bar"})
+ with pytest.raises(KeyError):
+ del headers["baz"]
+
+
+def test_header_with_incorrect_value():
+ with pytest.raises(
+ TypeError,
+ match=f"Header value must be str or bytes, not {type(None)}",
+ ):
+ httpx.Headers({"foo": None}) # type: ignore
+
+
+def test_host_with_auth_and_port_in_url():
+ """
+ The Host header should only include the hostname, or hostname:port
+ (for non-default ports only). Any userinfo or default port should not
+ be present.
+ """
+ url = "http://username:password@example.org:80/echo_headers"
+
+ with httpx.Client(transport=httpx.MockTransport(echo_headers)) as client:
+ response = client.get(url)
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "headers": {
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br, zstd",
+ "connection": "keep-alive",
+ "host": "example.org",
+ "user-agent": f"python-httpx/{httpx.__version__}",
+ "authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
+ }
+ }
+
+
+def test_host_with_non_default_port_in_url():
+ """
+ If the URL includes a non-default port, then it should be included in
+ the Host header.
+ """
+ url = "http://username:password@example.org:123/echo_headers"
+
+ with httpx.Client(transport=httpx.MockTransport(echo_headers)) as client:
+ response = client.get(url)
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "headers": {
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br, zstd",
+ "connection": "keep-alive",
+ "host": "example.org:123",
+ "user-agent": f"python-httpx/{httpx.__version__}",
+ "authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
+ }
+ }
+
+
+def test_request_auto_headers():
+ request = httpx.Request("GET", "https://www.example.org/")
+ assert "host" in request.headers
+
+
+def test_same_origin():
+ origin = httpx.URL("https://example.com")
+ request = httpx.Request("GET", "HTTPS://EXAMPLE.COM:443")
+
+ with httpx.Client() as client:
+ headers = client._redirect_headers(request, origin, "GET")
+
+ assert headers["Host"] == request.url.netloc.decode("ascii")
+
+
+def test_not_same_origin():
+ origin = httpx.URL("https://example.com")
+ request = httpx.Request("GET", "HTTP://EXAMPLE.COM:80")
+
+ with httpx.Client() as client:
+ headers = client._redirect_headers(request, origin, "GET")
+
+ assert headers["Host"] == origin.netloc.decode("ascii")
+
+
+def test_is_https_redirect():
+ url = httpx.URL("https://example.com")
+ request = httpx.Request(
+ "GET", "http://example.com", headers={"Authorization": "empty"}
+ )
+
+ with httpx.Client() as client:
+ headers = client._redirect_headers(request, url, "GET")
+
+ assert "Authorization" in headers
+
+
+def test_is_not_https_redirect():
+ url = httpx.URL("https://www.example.com")
+ request = httpx.Request(
+ "GET", "http://example.com", headers={"Authorization": "empty"}
+ )
+
+ with httpx.Client() as client:
+ headers = client._redirect_headers(request, url, "GET")
+
+ assert "Authorization" not in headers
+
+
+def test_is_not_https_redirect_if_not_default_ports():
+ url = httpx.URL("https://example.com:1337")
+ request = httpx.Request(
+ "GET", "http://example.com:9999", headers={"Authorization": "empty"}
+ )
+
+ with httpx.Client() as client:
+ headers = client._redirect_headers(request, url, "GET")
+
+ assert "Authorization" not in headers
--- /dev/null
+import httpx
+
+
+def test_client_base_url():
+ with httpx.Client() as client:
+ client.base_url = "https://www.example.org/" # type: ignore
+ assert isinstance(client.base_url, httpx.URL)
+ assert client.base_url == "https://www.example.org/"
+
+
+def test_client_base_url_without_trailing_slash():
+ with httpx.Client() as client:
+ client.base_url = "https://www.example.org/path" # type: ignore
+ assert isinstance(client.base_url, httpx.URL)
+ assert client.base_url == "https://www.example.org/path/"
+
+
+def test_client_base_url_with_trailing_slash():
+ client = httpx.Client()
+ client.base_url = "https://www.example.org/path/" # type: ignore
+ assert isinstance(client.base_url, httpx.URL)
+ assert client.base_url == "https://www.example.org/path/"
+
+
+def test_client_headers():
+ with httpx.Client() as client:
+ client.headers = {"a": "b"} # type: ignore
+ assert isinstance(client.headers, httpx.Headers)
+ assert client.headers["A"] == "b"
+
+
+def test_client_cookies():
+ with httpx.Client() as client:
+ client.cookies = {"a": "b"} # type: ignore
+ assert isinstance(client.cookies, httpx.Cookies)
+ mycookies = list(client.cookies.jar)
+ assert len(mycookies) == 1
+ assert mycookies[0].name == "a" and mycookies[0].value == "b"
+
+
+def test_client_timeout():
+ expected_timeout = 12.0
+ with httpx.Client() as client:
+ client.timeout = expected_timeout # type: ignore
+
+ assert isinstance(client.timeout, httpx.Timeout)
+ assert client.timeout.connect == expected_timeout
+ assert client.timeout.read == expected_timeout
+ assert client.timeout.write == expected_timeout
+ assert client.timeout.pool == expected_timeout
+
+
+def test_client_event_hooks():
+ def on_request(request):
+ pass # pragma: no cover
+
+ with httpx.Client() as client:
+ client.event_hooks = {"request": [on_request]}
+ assert client.event_hooks == {"request": [on_request], "response": []}
+
+
+def test_client_trust_env():
+ with httpx.Client() as client:
+ assert client.trust_env
+
+ with httpx.Client(trust_env=False) as client:
+ assert not client.trust_env
--- /dev/null
+import httpcore
+import pytest
+
+import httpx
+
+
+def url_to_origin(url: str) -> httpcore.URL:
+ """
+ Given a URL string, return the origin in the raw tuple format that
+ `httpcore` uses for it's representation.
+ """
+ u = httpx.URL(url)
+ return httpcore.URL(scheme=u.raw_scheme, host=u.raw_host, port=u.port, target="/")
+
+
+def test_socks_proxy():
+ url = httpx.URL("http://www.example.com")
+
+ for proxy in ("socks5://localhost/", "socks5h://localhost/"):
+ with httpx.Client(proxy=proxy) as client:
+ transport = client._transport_for_url(url)
+ assert isinstance(transport, httpx.HTTPTransport)
+ assert isinstance(transport._pool, httpcore.SOCKSProxy)
+
+
+PROXY_URL = "http://[::1]"
+
+
+@pytest.mark.parametrize(
+ ["url", "proxies", "expected"],
+ [
+ ("http://example.com", {}, None),
+ ("http://example.com", {"https://": PROXY_URL}, None),
+ ("http://example.com", {"http://example.net": PROXY_URL}, None),
+ # Using "*" should match any domain name.
+ ("http://example.com", {"http://*": PROXY_URL}, PROXY_URL),
+ ("https://example.com", {"http://*": PROXY_URL}, None),
+ # Using "example.com" should match example.com, but not www.example.com
+ ("http://example.com", {"http://example.com": PROXY_URL}, PROXY_URL),
+ ("http://www.example.com", {"http://example.com": PROXY_URL}, None),
+ # Using "*.example.com" should match www.example.com, but not example.com
+ ("http://example.com", {"http://*.example.com": PROXY_URL}, None),
+ ("http://www.example.com", {"http://*.example.com": PROXY_URL}, PROXY_URL),
+ # Using "*example.com" should match example.com and www.example.com
+ ("http://example.com", {"http://*example.com": PROXY_URL}, PROXY_URL),
+ ("http://www.example.com", {"http://*example.com": PROXY_URL}, PROXY_URL),
+ ("http://wwwexample.com", {"http://*example.com": PROXY_URL}, None),
+ # ...
+ ("http://example.com:443", {"http://example.com": PROXY_URL}, PROXY_URL),
+ ("http://example.com", {"all://": PROXY_URL}, PROXY_URL),
+ ("http://example.com", {"http://": PROXY_URL}, PROXY_URL),
+ ("http://example.com", {"all://example.com": PROXY_URL}, PROXY_URL),
+ ("http://example.com", {"http://example.com": PROXY_URL}, PROXY_URL),
+ ("http://example.com", {"http://example.com:80": PROXY_URL}, PROXY_URL),
+ ("http://example.com:8080", {"http://example.com:8080": PROXY_URL}, PROXY_URL),
+ ("http://example.com:8080", {"http://example.com": PROXY_URL}, PROXY_URL),
+ (
+ "http://example.com",
+ {
+ "all://": PROXY_URL + ":1",
+ "http://": PROXY_URL + ":2",
+ "all://example.com": PROXY_URL + ":3",
+ "http://example.com": PROXY_URL + ":4",
+ },
+ PROXY_URL + ":4",
+ ),
+ (
+ "http://example.com",
+ {
+ "all://": PROXY_URL + ":1",
+ "http://": PROXY_URL + ":2",
+ "all://example.com": PROXY_URL + ":3",
+ },
+ PROXY_URL + ":3",
+ ),
+ (
+ "http://example.com",
+ {"all://": PROXY_URL + ":1", "http://": PROXY_URL + ":2"},
+ PROXY_URL + ":2",
+ ),
+ ],
+)
+def test_transport_for_request(url, proxies, expected):
+ mounts = {key: httpx.HTTPTransport(proxy=value) for key, value in proxies.items()}
+ with httpx.Client(mounts=mounts) as client:
+ transport = client._transport_for_url(httpx.URL(url))
+
+ if expected is None:
+ assert transport is client._transport
+ else:
+ assert isinstance(transport, httpx.HTTPTransport)
+ assert isinstance(transport._pool, httpcore.HTTPProxy)
+ assert transport._pool._proxy_url == url_to_origin(expected)
+
+
+@pytest.mark.network
+def test_proxy_close():
+ try:
+ transport = httpx.HTTPTransport(proxy=PROXY_URL)
+ client = httpx.Client(mounts={"https://": transport})
+ client.get("http://example.com")
+ finally:
+ client.close()
+
+
+def test_unsupported_proxy_scheme():
+ with pytest.raises(ValueError):
+ httpx.Client(proxy="ftp://127.0.0.1")
+
+
+@pytest.mark.parametrize(
+ ["url", "env", "expected"],
+ [
+ ("http://google.com", {}, None),
+ (
+ "http://google.com",
+ {"HTTP_PROXY": "http://example.com"},
+ "http://example.com",
+ ),
+ # Auto prepend http scheme
+ ("http://google.com", {"HTTP_PROXY": "example.com"}, "http://example.com"),
+ (
+ "http://google.com",
+ {"HTTP_PROXY": "http://example.com", "NO_PROXY": "google.com"},
+ None,
+ ),
+ # Everything proxied when NO_PROXY is empty/unset
+ (
+ "http://127.0.0.1",
+ {"ALL_PROXY": "http://localhost:123", "NO_PROXY": ""},
+ "http://localhost:123",
+ ),
+ # Not proxied if NO_PROXY matches URL.
+ (
+ "http://127.0.0.1",
+ {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "127.0.0.1"},
+ None,
+ ),
+ # Proxied if NO_PROXY scheme does not match URL.
+ (
+ "http://127.0.0.1",
+ {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "https://127.0.0.1"},
+ "http://localhost:123",
+ ),
+ # Proxied if NO_PROXY scheme does not match host.
+ (
+ "http://127.0.0.1",
+ {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "1.1.1.1"},
+ "http://localhost:123",
+ ),
+ # Not proxied if NO_PROXY matches host domain suffix.
+ (
+ "http://courses.mit.edu",
+ {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu"},
+ None,
+ ),
+ # Proxied even though NO_PROXY matches host domain *prefix*.
+ (
+ "https://mit.edu.info",
+ {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu"},
+ "http://localhost:123",
+ ),
+ # Not proxied if one item in NO_PROXY case matches host domain suffix.
+ (
+ "https://mit.edu.info",
+ {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu,edu.info"},
+ None,
+ ),
+ # Not proxied if one item in NO_PROXY case matches host domain suffix.
+ # May include whitespace.
+ (
+ "https://mit.edu.info",
+ {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu, edu.info"},
+ None,
+ ),
+ # Proxied if no items in NO_PROXY match.
+ (
+ "https://mit.edu.info",
+ {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu,mit.info"},
+ "http://localhost:123",
+ ),
+ # Proxied if NO_PROXY domain doesn't match.
+ (
+ "https://foo.example.com",
+ {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "www.example.com"},
+ "http://localhost:123",
+ ),
+ # Not proxied for subdomains matching NO_PROXY, with a leading ".".
+ (
+ "https://www.example1.com",
+ {"ALL_PROXY": "http://localhost:123", "NO_PROXY": ".example1.com"},
+ None,
+ ),
+ # Proxied, because NO_PROXY subdomains only match if "." separated.
+ (
+ "https://www.example2.com",
+ {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "ample2.com"},
+ "http://localhost:123",
+ ),
+ # No requests are proxied if NO_PROXY="*" is set.
+ (
+ "https://www.example3.com",
+ {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "*"},
+ None,
+ ),
+ ],
+)
+def test_proxies_environ(monkeypatch, url, env, expected):
+ for name, value in env.items():
+ monkeypatch.setenv(name, value)
+
+ with httpx.Client() as client:
+ transport = client._transport_for_url(httpx.URL(url))
+
+ if expected is None:
+ assert transport == client._transport
+ else:
+ assert transport._pool._proxy_url == url_to_origin(expected) # type: ignore
+
+
+@pytest.mark.parametrize(
+ ["proxies", "is_valid"],
+ [
+ ({"http": "http://127.0.0.1"}, False),
+ ({"https": "http://127.0.0.1"}, False),
+ ({"all": "http://127.0.0.1"}, False),
+ ({"http://": "http://127.0.0.1"}, True),
+ ({"https://": "http://127.0.0.1"}, True),
+ ({"all://": "http://127.0.0.1"}, True),
+ ],
+)
+def test_for_deprecated_proxy_params(proxies, is_valid):
+ mounts = {key: httpx.HTTPTransport(proxy=value) for key, value in proxies.items()}
+
+ if not is_valid:
+ with pytest.raises(ValueError):
+ httpx.Client(mounts=mounts)
+ else:
+ httpx.Client(mounts=mounts)
+
+
+def test_proxy_with_mounts():
+ proxy_transport = httpx.HTTPTransport(proxy="http://127.0.0.1")
+
+ with httpx.Client(mounts={"http://": proxy_transport}) as client:
+ transport = client._transport_for_url(httpx.URL("http://example.com"))
+ assert transport == proxy_transport
--- /dev/null
+import httpx
+
+
+def hello_world(request: httpx.Request) -> httpx.Response:
+ return httpx.Response(200, text="Hello, world")
+
+
+def test_client_queryparams():
+ client = httpx.Client(params={"a": "b"})
+ assert isinstance(client.params, httpx.QueryParams)
+ assert client.params["a"] == "b"
+
+
+def test_client_queryparams_string():
+ with httpx.Client(params="a=b") as client:
+ assert isinstance(client.params, httpx.QueryParams)
+ assert client.params["a"] == "b"
+
+ with httpx.Client() as client:
+ client.params = "a=b" # type: ignore
+ assert isinstance(client.params, httpx.QueryParams)
+ assert client.params["a"] == "b"
+
+
+def test_client_queryparams_echo():
+ url = "http://example.org/echo_queryparams"
+ client_queryparams = "first=str"
+ request_queryparams = {"second": "dict"}
+ with httpx.Client(
+ transport=httpx.MockTransport(hello_world), params=client_queryparams
+ ) as client:
+ response = client.get(url, params=request_queryparams)
+
+ assert response.status_code == 200
+ assert (
+ response.url == "http://example.org/echo_queryparams?first=str&second=dict"
+ )
--- /dev/null
+import typing
+
+import pytest
+
+import httpx
+
+
+def redirects(request: httpx.Request) -> httpx.Response:
+ if request.url.scheme not in ("http", "https"):
+ raise httpx.UnsupportedProtocol(f"Scheme {request.url.scheme!r} not supported.")
+
+ if request.url.path == "/redirect_301":
+ status_code = httpx.codes.MOVED_PERMANENTLY
+ content = b"<a href='https://example.org/'>here</a>"
+ headers = {"location": "https://example.org/"}
+ return httpx.Response(status_code, headers=headers, content=content)
+
+ elif request.url.path == "/redirect_302":
+ status_code = httpx.codes.FOUND
+ headers = {"location": "https://example.org/"}
+ return httpx.Response(status_code, headers=headers)
+
+ elif request.url.path == "/redirect_303":
+ status_code = httpx.codes.SEE_OTHER
+ headers = {"location": "https://example.org/"}
+ return httpx.Response(status_code, headers=headers)
+
+ elif request.url.path == "/relative_redirect":
+ status_code = httpx.codes.SEE_OTHER
+ headers = {"location": "/"}
+ return httpx.Response(status_code, headers=headers)
+
+ elif request.url.path == "/malformed_redirect":
+ status_code = httpx.codes.SEE_OTHER
+ headers = {"location": "https://:443/"}
+ return httpx.Response(status_code, headers=headers)
+
+ elif request.url.path == "/invalid_redirect":
+ status_code = httpx.codes.SEE_OTHER
+ raw_headers = [(b"location", "https://😇/".encode("utf-8"))]
+ return httpx.Response(status_code, headers=raw_headers)
+
+ elif request.url.path == "/no_scheme_redirect":
+ status_code = httpx.codes.SEE_OTHER
+ headers = {"location": "//example.org/"}
+ return httpx.Response(status_code, headers=headers)
+
+ elif request.url.path == "/multiple_redirects":
+ params = httpx.QueryParams(request.url.query)
+ count = int(params.get("count", "0"))
+ redirect_count = count - 1
+ status_code = httpx.codes.SEE_OTHER if count else httpx.codes.OK
+ if count:
+ location = "/multiple_redirects"
+ if redirect_count:
+ location += f"?count={redirect_count}"
+ headers = {"location": location}
+ else:
+ headers = {}
+ return httpx.Response(status_code, headers=headers)
+
+ if request.url.path == "/redirect_loop":
+ status_code = httpx.codes.SEE_OTHER
+ headers = {"location": "/redirect_loop"}
+ return httpx.Response(status_code, headers=headers)
+
+ elif request.url.path == "/cross_domain":
+ status_code = httpx.codes.SEE_OTHER
+ headers = {"location": "https://example.org/cross_domain_target"}
+ return httpx.Response(status_code, headers=headers)
+
+ elif request.url.path == "/cross_domain_target":
+ status_code = httpx.codes.OK
+ data = {
+ "body": request.content.decode("ascii"),
+ "headers": dict(request.headers),
+ }
+ return httpx.Response(status_code, json=data)
+
+ elif request.url.path == "/redirect_body":
+ status_code = httpx.codes.PERMANENT_REDIRECT
+ headers = {"location": "/redirect_body_target"}
+ return httpx.Response(status_code, headers=headers)
+
+ elif request.url.path == "/redirect_no_body":
+ status_code = httpx.codes.SEE_OTHER
+ headers = {"location": "/redirect_body_target"}
+ return httpx.Response(status_code, headers=headers)
+
+ elif request.url.path == "/redirect_body_target":
+ data = {
+ "body": request.content.decode("ascii"),
+ "headers": dict(request.headers),
+ }
+ return httpx.Response(200, json=data)
+
+ elif request.url.path == "/cross_subdomain":
+ if request.headers["Host"] != "www.example.org":
+ status_code = httpx.codes.PERMANENT_REDIRECT
+ headers = {"location": "https://www.example.org/cross_subdomain"}
+ return httpx.Response(status_code, headers=headers)
+ else:
+ return httpx.Response(200, text="Hello, world!")
+
+ elif request.url.path == "/redirect_custom_scheme":
+ status_code = httpx.codes.MOVED_PERMANENTLY
+ headers = {"location": "market://details?id=42"}
+ return httpx.Response(status_code, headers=headers)
+
+ if request.method == "HEAD":
+ return httpx.Response(200)
+
+ return httpx.Response(200, html="<html><body>Hello, world!</body></html>")
+
+
+def test_redirect_301():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ response = client.post(
+ "https://example.org/redirect_301", follow_redirects=True
+ )
+ assert response.status_code == httpx.codes.OK
+ assert response.url == "https://example.org/"
+ assert len(response.history) == 1
+
+
+def test_redirect_302():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ response = client.post(
+ "https://example.org/redirect_302", follow_redirects=True
+ )
+ assert response.status_code == httpx.codes.OK
+ assert response.url == "https://example.org/"
+ assert len(response.history) == 1
+
+
+def test_redirect_303():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ response = client.get("https://example.org/redirect_303", follow_redirects=True)
+ assert response.status_code == httpx.codes.OK
+ assert response.url == "https://example.org/"
+ assert len(response.history) == 1
+
+
+def test_next_request():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ request = client.build_request("POST", "https://example.org/redirect_303")
+ response = client.send(request, follow_redirects=False)
+ assert response.status_code == httpx.codes.SEE_OTHER
+ assert response.url == "https://example.org/redirect_303"
+ assert response.next_request is not None
+
+ response = client.send(response.next_request, follow_redirects=False)
+ assert response.status_code == httpx.codes.OK
+ assert response.url == "https://example.org/"
+ assert response.next_request is None
+
+
+def test_head_redirect():
+ """
+ Contrary to Requests, redirects remain enabled by default for HEAD requests.
+ """
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ response = client.head(
+ "https://example.org/redirect_302", follow_redirects=True
+ )
+ assert response.status_code == httpx.codes.OK
+ assert response.url == "https://example.org/"
+ assert response.request.method == "HEAD"
+ assert len(response.history) == 1
+ assert response.text == ""
+
+
+def test_relative_redirect():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ response = client.get(
+ "https://example.org/relative_redirect", follow_redirects=True
+ )
+ assert response.status_code == httpx.codes.OK
+ assert response.url == "https://example.org/"
+ assert len(response.history) == 1
+
+
+def test_malformed_redirect():
+ # https://github.com/encode/httpx/issues/771
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ response = client.get(
+ "http://example.org/malformed_redirect", follow_redirects=True
+ )
+ assert response.status_code == httpx.codes.OK
+ assert response.url == "https://example.org:443/"
+ assert len(response.history) == 1
+
+
+def test_no_scheme_redirect():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ response = client.get(
+ "https://example.org/no_scheme_redirect", follow_redirects=True
+ )
+ assert response.status_code == httpx.codes.OK
+ assert response.url == "https://example.org/"
+ assert len(response.history) == 1
+
+
+def test_fragment_redirect():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ response = client.get(
+ "https://example.org/relative_redirect#fragment", follow_redirects=True
+ )
+ assert response.status_code == httpx.codes.OK
+ assert response.url == "https://example.org/#fragment"
+ assert len(response.history) == 1
+
+
+def test_multiple_redirects():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ response = client.get(
+ "https://example.org/multiple_redirects?count=20", follow_redirects=True
+ )
+ assert response.status_code == httpx.codes.OK
+ assert response.url == "https://example.org/multiple_redirects"
+ assert len(response.history) == 20
+ assert (
+ response.history[0].url == "https://example.org/multiple_redirects?count=20"
+ )
+ assert (
+ response.history[1].url == "https://example.org/multiple_redirects?count=19"
+ )
+ assert len(response.history[0].history) == 0
+ assert len(response.history[1].history) == 1
+
+
+def test_too_many_redirects():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ with pytest.raises(httpx.TooManyRedirects):
+ client.get(
+ "https://example.org/multiple_redirects?count=21", follow_redirects=True
+ )
+
+
+def test_redirect_loop():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ with pytest.raises(httpx.TooManyRedirects):
+ client.get("https://example.org/redirect_loop", follow_redirects=True)
+
+
+def test_cross_domain_redirect_with_auth_header():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ url = "https://example.com/cross_domain"
+ headers = {"Authorization": "abc"}
+ response = client.get(url, headers=headers, follow_redirects=True)
+ assert response.url == "https://example.org/cross_domain_target"
+ assert "authorization" not in response.json()["headers"]
+
+
+def test_cross_domain_https_redirect_with_auth_header():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ url = "http://example.com/cross_domain"
+ headers = {"Authorization": "abc"}
+ response = client.get(url, headers=headers, follow_redirects=True)
+ assert response.url == "https://example.org/cross_domain_target"
+ assert "authorization" not in response.json()["headers"]
+
+
+def test_cross_domain_redirect_with_auth():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ url = "https://example.com/cross_domain"
+ response = client.get(url, auth=("user", "pass"), follow_redirects=True)
+ assert response.url == "https://example.org/cross_domain_target"
+ assert "authorization" not in response.json()["headers"]
+
+
+def test_same_domain_redirect():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ url = "https://example.org/cross_domain"
+ headers = {"Authorization": "abc"}
+ response = client.get(url, headers=headers, follow_redirects=True)
+ assert response.url == "https://example.org/cross_domain_target"
+ assert response.json()["headers"]["authorization"] == "abc"
+
+
+def test_same_domain_https_redirect_with_auth_header():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ url = "http://example.org/cross_domain"
+ headers = {"Authorization": "abc"}
+ response = client.get(url, headers=headers, follow_redirects=True)
+ assert response.url == "https://example.org/cross_domain_target"
+ assert response.json()["headers"]["authorization"] == "abc"
+
+
+def test_body_redirect():
+ """
+ A 308 redirect should preserve the request body.
+ """
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ url = "https://example.org/redirect_body"
+ content = b"Example request body"
+ response = client.post(url, content=content, follow_redirects=True)
+ assert response.url == "https://example.org/redirect_body_target"
+ assert response.json()["body"] == "Example request body"
+ assert "content-length" in response.json()["headers"]
+
+
+def test_no_body_redirect():
+ """
+ A 303 redirect should remove the request body.
+ """
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ url = "https://example.org/redirect_no_body"
+ content = b"Example request body"
+ response = client.post(url, content=content, follow_redirects=True)
+ assert response.url == "https://example.org/redirect_body_target"
+ assert response.json()["body"] == ""
+ assert "content-length" not in response.json()["headers"]
+
+
+def test_can_stream_if_no_redirect():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ url = "https://example.org/redirect_301"
+ with client.stream("GET", url, follow_redirects=False) as response:
+ pass
+ assert response.status_code == httpx.codes.MOVED_PERMANENTLY
+ assert response.headers["location"] == "https://example.org/"
+
+
+class ConsumeBodyTransport(httpx.MockTransport):
+ def handle_request(self, request: httpx.Request) -> httpx.Response:
+ assert isinstance(request.stream, httpx.SyncByteStream)
+ for _ in request.stream:
+ pass
+ return self.handler(request) # type: ignore[return-value]
+
+
+def test_cannot_redirect_streaming_body():
+ with httpx.Client(transport=ConsumeBodyTransport(redirects)) as client:
+ url = "https://example.org/redirect_body"
+
+ def streaming_body() -> typing.Iterator[bytes]:
+ yield b"Example request body" # pragma: no cover
+
+ with pytest.raises(httpx.StreamConsumed):
+ client.post(url, content=streaming_body(), follow_redirects=True)
+
+
+def test_cross_subdomain_redirect():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ url = "https://example.com/cross_subdomain"
+ response = client.get(url, follow_redirects=True)
+ assert response.url == "https://www.example.org/cross_subdomain"
+
+
+def cookie_sessions(request: httpx.Request) -> httpx.Response:
+ if request.url.path == "/":
+ cookie = request.headers.get("Cookie")
+ if cookie is not None:
+ content = b"Logged in"
+ else:
+ content = b"Not logged in"
+ return httpx.Response(200, content=content)
+
+ elif request.url.path == "/login":
+ status_code = httpx.codes.SEE_OTHER
+ headers = {
+ "location": "/",
+ "set-cookie": (
+ "session=eyJ1c2VybmFtZSI6ICJ0b21; path=/; Max-Age=1209600; "
+ "httponly; samesite=lax"
+ ),
+ }
+ return httpx.Response(status_code, headers=headers)
+
+ else:
+ assert request.url.path == "/logout"
+ status_code = httpx.codes.SEE_OTHER
+ headers = {
+ "location": "/",
+ "set-cookie": (
+ "session=null; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT; "
+ "httponly; samesite=lax"
+ ),
+ }
+ return httpx.Response(status_code, headers=headers)
+
+
+def test_redirect_cookie_behavior():
+ with httpx.Client(
+ transport=httpx.MockTransport(cookie_sessions), follow_redirects=True
+ ) as client:
+ # The client is not logged in.
+ response = client.get("https://example.com/")
+ assert response.url == "https://example.com/"
+ assert response.text == "Not logged in"
+
+ # Login redirects to the homepage, setting a session cookie.
+ response = client.post("https://example.com/login")
+ assert response.url == "https://example.com/"
+ assert response.text == "Logged in"
+
+ # The client is logged in.
+ response = client.get("https://example.com/")
+ assert response.url == "https://example.com/"
+ assert response.text == "Logged in"
+
+ # Logout redirects to the homepage, expiring the session cookie.
+ response = client.post("https://example.com/logout")
+ assert response.url == "https://example.com/"
+ assert response.text == "Not logged in"
+
+ # The client is not logged in.
+ response = client.get("https://example.com/")
+ assert response.url == "https://example.com/"
+ assert response.text == "Not logged in"
+
+
+def test_redirect_custom_scheme():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ with pytest.raises(httpx.UnsupportedProtocol) as e:
+ client.post(
+ "https://example.org/redirect_custom_scheme", follow_redirects=True
+ )
+ assert str(e.value) == "Scheme 'market' not supported."
+
+
+def test_invalid_redirect():
+ with httpx.Client(transport=httpx.MockTransport(redirects)) as client:
+ with pytest.raises(httpx.RemoteProtocolError):
+ client.get("http://example.org/invalid_redirect", follow_redirects=True)