From: Kar Petrosyan Date: Thu, 27 Feb 2025 16:38:08 +0000 (+0400) Subject: Use unasync for tests X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a2fc6825a513abe8ed3ca269510ba14e8b61683f;p=thirdparty%2Fhttpx.git Use unasync for tests --- diff --git a/scripts/check b/scripts/check index a4bce094..90e9a417 100755 --- a/scripts/check +++ b/scripts/check @@ -12,3 +12,4 @@ set -x ${PREFIX}ruff format $SOURCE_FILES --diff ${PREFIX}mypy $SOURCE_FILES ${PREFIX}ruff check $SOURCE_FILES +${PREFIX}python scripts/unasync.py --check diff --git a/scripts/lint b/scripts/lint index 6d096d76..64bf1db1 100755 --- a/scripts/lint +++ b/scripts/lint @@ -10,3 +10,4 @@ set -x ${PREFIX}ruff check --fix $SOURCE_FILES ${PREFIX}ruff format $SOURCE_FILES +${PREFIX}python scripts/unasync.py diff --git a/scripts/unasync.py b/scripts/unasync.py new file mode 100755 index 00000000..6d127638 --- /dev/null +++ b/scripts/unasync.py @@ -0,0 +1,92 @@ +#!venv/bin/python +import os +import re +import sys +from pprint import pprint + +SUBS = [ + # httpx specific + ('AsyncByteStream', 'SyncByteStream'), + ('async_auth_flow', 'sync_auth_flow'), + ('handle_async_request', 'handle_request'), + # general + ('AsyncIterator', 'Iterator'), + ('from anyio import Lock', 'from threading import Lock'), + ('Async([A-Z][A-Za-z0-9_]*)', r'\2'), + ('async def', 'def'), + ('async with', 'with'), + ('async for', 'for'), + ('await ', ''), + ('aclose', 'close'), + ('aread', 'read'), + ('__aenter__', '__enter__'), + ('__aexit__', '__exit__'), + ('__aiter__', '__iter__'), + ('@pytest.mark.anyio', ''), +] +COMPILED_SUBS = [ + (re.compile(r'(^|\b)' + regex + r'($|\b)'), repl) + for regex, repl in SUBS +] + +USED_SUBS = set() + +def unasync_line(line): + for index, (regex, repl) in enumerate(COMPILED_SUBS): + old_line = line + line = re.sub(regex, repl, line) + if old_line != line: + USED_SUBS.add(index) + return line + + +def unasync_file(in_path, out_path): + with open(in_path, "r") as in_file: + with open(out_path, "w", newline="") as out_file: + for line in in_file.readlines(): + line = unasync_line(line) + out_file.write(line) + + +def unasync_file_check(in_path, out_path): + with open(in_path, "r") as in_file: + with open(out_path, "r") as out_file: + for in_line, out_line in zip(in_file.readlines(), out_file.readlines()): + expected = unasync_line(in_line) + if out_line != expected: + print(f'unasync mismatch between {in_path!r} and {out_path!r}') + print(f'Async code: {in_line!r}') + print(f'Expected sync code: {expected!r}') + print(f'Actual sync code: {out_line!r}') + sys.exit(1) + + +def unasync_dir(in_dir, out_dir, check_only=False): + for dirpath, dirnames, filenames in os.walk(in_dir): + for filename in filenames: + if not filename.endswith('.py'): + continue + rel_dir = os.path.relpath(dirpath, in_dir) + in_path = os.path.normpath(os.path.join(in_dir, rel_dir, filename)) + out_path = os.path.normpath(os.path.join(out_dir, rel_dir, filename)) + print(in_path, '->', out_path) + if check_only: + unasync_file_check(in_path, out_path) + else: + unasync_file(in_path, out_path) + + +def main(): + check_only = '--check' in sys.argv + unasync_dir("tests/client/async", "tests/client/sync", check_only=check_only) + + if len(USED_SUBS) != len(SUBS): + unused_subs = [SUBS[i] for i in range(len(SUBS)) if i not in USED_SUBS] + + print("These patterns were not used:") + pprint(unused_subs) + exit(1) + + +if __name__ == '__main__': + main() diff --git a/tests/client/async/__init__.py b/tests/client/async/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/client/test_auth.py b/tests/client/async/test_auth.py similarity index 98% rename from tests/client/test_auth.py rename to tests/client/async/test_auth.py index f1965766..da63f1ef 100644 --- a/tests/client/test_auth.py +++ b/tests/client/async/test_auth.py @@ -11,12 +11,12 @@ import sys import typing from urllib.request import parse_keqv_list -import anyio import pytest +from anyio import Lock import httpx -from ..common import FIXTURES_DIR +from ...common import FIXTURES_DIR class App: @@ -140,13 +140,11 @@ class AsyncAuth(httpx.Auth): """ def __init__(self) -> None: - self._async_lock = anyio.Lock() + self._lock = Lock() - async def async_auth_flow( - self, request: httpx.Request - ) -> typing.AsyncGenerator[httpx.Request, httpx.Response]: - async with self._async_lock: - request.headers["Authorization"] = "async-auth" + async def async_auth_flow(self, request: httpx.Request) -> typing.Any: + async with self._lock: + request.headers["Authorization"] = "auth" yield request @@ -694,4 +692,4 @@ async def test_auth() -> None: response = await client.get(url, auth=auth) assert response.status_code == 200 - assert response.json() == {"auth": "async-auth"} + assert response.json() == {"auth": "auth"} diff --git a/tests/client/test_async_client.py b/tests/client/async/test_client.py similarity index 96% rename from tests/client/test_async_client.py rename to tests/client/async/test_client.py index 8d7eaa3c..822f665e 100644 --- a/tests/client/test_async_client.py +++ b/tests/client/async/test_client.py @@ -100,17 +100,6 @@ async def test_stream_request(server): assert response.status_code == 200 -@pytest.mark.anyio -async def test_cannot_stream_sync_request(server): - def hello_world() -> typing.Iterator[bytes]: # pragma: no cover - yield b"Hello, " - yield b"world!" - - async with httpx.AsyncClient() as client: - with pytest.raises(RuntimeError): - await client.post(server.url, content=hello_world()) - - @pytest.mark.anyio async def test_raise_for_status(server): async with httpx.AsyncClient() as client: @@ -314,7 +303,7 @@ async def test_mounted_transport(): @pytest.mark.anyio -async def test_async_mock_transport(): +async def test_mock_transport(): async def hello_world(request: httpx.Request) -> httpx.Response: return httpx.Response(200, text="Hello, world!") diff --git a/tests/client/test_cookies.py b/tests/client/async/test_cookies.py similarity index 100% rename from tests/client/test_cookies.py rename to tests/client/async/test_cookies.py diff --git a/tests/client/test_event_hooks.py b/tests/client/async/test_event_hooks.py similarity index 100% rename from tests/client/test_event_hooks.py rename to tests/client/async/test_event_hooks.py diff --git a/tests/client/test_headers.py b/tests/client/async/test_headers.py similarity index 100% rename from tests/client/test_headers.py rename to tests/client/async/test_headers.py diff --git a/tests/client/test_properties.py b/tests/client/async/test_properties.py similarity index 100% rename from tests/client/test_properties.py rename to tests/client/async/test_properties.py diff --git a/tests/client/test_proxies.py b/tests/client/async/test_proxies.py similarity index 100% rename from tests/client/test_proxies.py rename to tests/client/async/test_proxies.py diff --git a/tests/client/test_queryparams.py b/tests/client/async/test_queryparams.py similarity index 100% rename from tests/client/test_queryparams.py rename to tests/client/async/test_queryparams.py diff --git a/tests/client/test_redirects.py b/tests/client/async/test_redirects.py similarity index 100% rename from tests/client/test_redirects.py rename to tests/client/async/test_redirects.py diff --git a/tests/client/sync/__init__.py b/tests/client/sync/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/client/sync/test_auth.py b/tests/client/sync/test_auth.py new file mode 100644 index 00000000..8aa1671c --- /dev/null +++ b/tests/client/sync/test_auth.py @@ -0,0 +1,655 @@ +""" +Integration tests for authentication. + +Unit tests for auth classes also exist in tests/test_auth.py +""" + +import hashlib +import netrc +import os +import sys +import typing +from threading import Lock +from urllib.request import parse_keqv_list + +import pytest + +import httpx + +from ...common import FIXTURES_DIR + + +class App: + """ + A mock app to test auth credentials. + """ + + def __init__(self, auth_header: str = "", status_code: int = 200) -> None: + self.auth_header = auth_header + self.status_code = status_code + + def __call__(self, request: httpx.Request) -> httpx.Response: + headers = {"www-authenticate": self.auth_header} if self.auth_header else {} + data = {"auth": request.headers.get("Authorization")} + return httpx.Response(self.status_code, headers=headers, json=data) + + +class DigestApp: + def __init__( + self, + algorithm: str = "SHA-256", + send_response_after_attempt: int = 1, + qop: str = "auth", + regenerate_nonce: bool = True, + ) -> None: + self.algorithm = algorithm + self.send_response_after_attempt = send_response_after_attempt + self.qop = qop + self._regenerate_nonce = regenerate_nonce + self._response_count = 0 + + def __call__(self, request: httpx.Request) -> httpx.Response: + if self._response_count < self.send_response_after_attempt: + return self.challenge_send(request) + + data = {"auth": request.headers.get("Authorization")} + return httpx.Response(200, json=data) + + def challenge_send(self, request: httpx.Request) -> httpx.Response: + self._response_count += 1 + nonce = ( + hashlib.sha256(os.urandom(8)).hexdigest() + if self._regenerate_nonce + else "ee96edced2a0b43e4869e96ebe27563f369c1205a049d06419bb51d8aeddf3d3" + ) + challenge_data = { + "nonce": nonce, + "qop": self.qop, + "opaque": ( + "ee6378f3ee14ebfd2fff54b70a91a7c9390518047f242ab2271380db0e14bda1" + ), + "algorithm": self.algorithm, + "stale": "FALSE", + } + challenge_str = ", ".join( + '{}="{}"'.format(key, value) + for key, value in challenge_data.items() + if value + ) + + headers = { + "www-authenticate": f'Digest realm="httpx@example.org", {challenge_str}', + } + return httpx.Response(401, headers=headers) + + +class RepeatAuth(httpx.Auth): + """ + A mock authentication scheme that requires clients to send + the request a fixed number of times, and then send a last request containing + an aggregation of nonces that the server sent in 'WWW-Authenticate' headers + of intermediate responses. + """ + + requires_request_body = True + + def __init__(self, repeat: int) -> None: + self.repeat = repeat + + def auth_flow( + self, request: httpx.Request + ) -> typing.Generator[httpx.Request, httpx.Response, None]: + nonces = [] + + for index in range(self.repeat): + request.headers["Authorization"] = f"Repeat {index}" + response = yield request + nonces.append(response.headers["www-authenticate"]) + + key = ".".join(nonces) + request.headers["Authorization"] = f"Repeat {key}" + yield request + + +class ResponseBodyAuth(httpx.Auth): + """ + A mock authentication scheme that requires clients to send an 'Authorization' + header, then send back the contents of the response in the 'Authorization' + header. + """ + + requires_response_body = True + + def __init__(self, token: str) -> None: + self.token = token + + def auth_flow( + self, request: httpx.Request + ) -> typing.Generator[httpx.Request, httpx.Response, None]: + request.headers["Authorization"] = self.token + response = yield request + data = response.text + request.headers["Authorization"] = data + yield request + + +class Auth(httpx.Auth): + """ + A mock authentication scheme that uses a different implementation for the + sync and async cases. + """ + + def __init__(self) -> None: + self._lock = Lock() + + def sync_auth_flow(self, request: httpx.Request) -> typing.Any: + with self._lock: + request.headers["Authorization"] = "auth" + yield request + + +def test_basic_auth() -> None: + url = "https://example.org/" + auth = ("user", "password123") + app = App() + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url, auth=auth) + + assert response.status_code == 200 + assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="} + + +def test_basic_auth_with_stream() -> None: + """ + See: https://github.com/encode/httpx/pull/1312 + """ + url = "https://example.org/" + auth = ("user", "password123") + app = App() + + with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client: + with client.stream("GET", url) as response: + response.read() + + assert response.status_code == 200 + assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="} + + +def test_basic_auth_in_url() -> None: + url = "https://user:password123@example.org/" + app = App() + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url) + + assert response.status_code == 200 + assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="} + + +def test_basic_auth_on_session() -> None: + url = "https://example.org/" + auth = ("user", "password123") + app = App() + + with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client: + response = client.get(url) + + assert response.status_code == 200 + assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="} + + +def test_custom_auth() -> None: + url = "https://example.org/" + app = App() + + def auth(request: httpx.Request) -> httpx.Request: + request.headers["Authorization"] = "Token 123" + return request + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url, auth=auth) + + assert response.status_code == 200 + assert response.json() == {"auth": "Token 123"} + + +def test_netrc_auth_credentials_exist() -> None: + """ + When netrc auth is being used and a request is made to a host that is + in the netrc file, then the relevant credentials should be applied. + """ + netrc_file = str(FIXTURES_DIR / ".netrc") + url = "http://netrcexample.org" + app = App() + auth = httpx.NetRCAuth(netrc_file) + + with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client: + response = client.get(url) + + assert response.status_code == 200 + assert response.json() == { + "auth": "Basic ZXhhbXBsZS11c2VybmFtZTpleGFtcGxlLXBhc3N3b3Jk" + } + + +def test_netrc_auth_credentials_do_not_exist() -> None: + """ + When netrc auth is being used and a request is made to a host that is + not in the netrc file, then no credentials should be applied. + """ + netrc_file = str(FIXTURES_DIR / ".netrc") + url = "http://example.org" + app = App() + auth = httpx.NetRCAuth(netrc_file) + + with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client: + response = client.get(url) + + assert response.status_code == 200 + assert response.json() == {"auth": None} + + +@pytest.mark.skipif( + sys.version_info >= (3, 11), + reason="netrc files without a password are valid from Python >= 3.11", +) +def test_netrc_auth_nopassword_parse_error() -> None: # pragma: no cover + """ + Python has different netrc parsing behaviours with different versions. + For Python < 3.11 a netrc file with no password is invalid. In this case + we want to allow the parse error to be raised. + """ + netrc_file = str(FIXTURES_DIR / ".netrc-nopassword") + with pytest.raises(netrc.NetrcParseError): + httpx.NetRCAuth(netrc_file) + + +def test_auth_disable_per_request() -> None: + url = "https://example.org/" + auth = ("user", "password123") + app = App() + + with httpx.Client(transport=httpx.MockTransport(app), auth=auth) as client: + response = client.get(url, auth=None) + + assert response.status_code == 200 + assert response.json() == {"auth": None} + + +def test_auth_hidden_url() -> None: + url = "http://example-username:example-password@example.org/" + expected = "URL('http://example-username:[secure]@example.org/')" + assert url == httpx.URL(url) + assert expected == repr(httpx.URL(url)) + + +def test_auth_hidden_header() -> None: + url = "https://example.org/" + auth = ("example-username", "example-password") + app = App() + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url, auth=auth) + + assert "'authorization': '[secure]'" in str(response.request.headers) + + +def test_auth_property() -> None: + app = App() + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + assert client.auth is None + + client.auth = ("user", "password123") # type: ignore + assert isinstance(client.auth, httpx.BasicAuth) + + url = "https://example.org/" + response = client.get(url) + assert response.status_code == 200 + assert response.json() == {"auth": "Basic dXNlcjpwYXNzd29yZDEyMw=="} + + +def test_auth_invalid_type() -> None: + app = App() + + with pytest.raises(TypeError): + client = httpx.Client( + transport=httpx.MockTransport(app), + auth="not a tuple, not a callable", # type: ignore + ) + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + with pytest.raises(TypeError): + client.get(auth="not a tuple, not a callable") # type: ignore + + with pytest.raises(TypeError): + client.auth = "not a tuple, not a callable" # type: ignore + + +def test_digest_auth_returns_no_auth_if_no_digest_header_in_response() -> None: + url = "https://example.org/" + auth = httpx.DigestAuth(username="user", password="password123") + app = App() + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url, auth=auth) + + assert response.status_code == 200 + assert response.json() == {"auth": None} + assert len(response.history) == 0 + + +def test_digest_auth_returns_no_auth_if_alternate_auth_scheme() -> None: + url = "https://example.org/" + auth = httpx.DigestAuth(username="user", password="password123") + auth_header = "Token ..." + app = App(auth_header=auth_header, status_code=401) + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url, auth=auth) + + assert response.status_code == 401 + assert response.json() == {"auth": None} + assert len(response.history) == 0 + + +def test_digest_auth_200_response_including_digest_auth_header() -> None: + url = "https://example.org/" + auth = httpx.DigestAuth(username="user", password="password123") + auth_header = 'Digest realm="realm@host.com",qop="auth",nonce="abc",opaque="xyz"' + app = App(auth_header=auth_header, status_code=200) + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url, auth=auth) + + assert response.status_code == 200 + assert response.json() == {"auth": None} + assert len(response.history) == 0 + + +def test_digest_auth_401_response_without_digest_auth_header() -> None: + url = "https://example.org/" + auth = httpx.DigestAuth(username="user", password="password123") + app = App(auth_header="", status_code=401) + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url, auth=auth) + + assert response.status_code == 401 + assert response.json() == {"auth": None} + assert len(response.history) == 0 + + +@pytest.mark.parametrize( + "algorithm,expected_hash_length,expected_response_length", + [ + ("MD5", 64, 32), + ("MD5-SESS", 64, 32), + ("SHA", 64, 40), + ("SHA-SESS", 64, 40), + ("SHA-256", 64, 64), + ("SHA-256-SESS", 64, 64), + ("SHA-512", 64, 128), + ("SHA-512-SESS", 64, 128), + ], +) +def test_digest_auth( + algorithm: str, expected_hash_length: int, expected_response_length: int +) -> None: + url = "https://example.org/" + auth = httpx.DigestAuth(username="user", password="password123") + app = DigestApp(algorithm=algorithm) + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url, auth=auth) + + assert response.status_code == 200 + assert len(response.history) == 1 + + authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"] + scheme, _, fields = authorization.partition(" ") + assert scheme == "Digest" + + response_fields = [field.strip() for field in fields.split(",")] + digest_data = dict(field.split("=") for field in response_fields) + + assert digest_data["username"] == '"user"' + assert digest_data["realm"] == '"httpx@example.org"' + assert "nonce" in digest_data + assert digest_data["uri"] == '"/"' + assert len(digest_data["response"]) == expected_response_length + 2 # extra quotes + assert len(digest_data["opaque"]) == expected_hash_length + 2 + assert digest_data["algorithm"] == algorithm + assert digest_data["qop"] == "auth" + assert digest_data["nc"] == "00000001" + assert len(digest_data["cnonce"]) == 16 + 2 + + +def test_digest_auth_no_specified_qop() -> None: + url = "https://example.org/" + auth = httpx.DigestAuth(username="user", password="password123") + app = DigestApp(qop="") + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url, auth=auth) + + assert response.status_code == 200 + assert len(response.history) == 1 + + authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"] + scheme, _, fields = authorization.partition(" ") + assert scheme == "Digest" + + response_fields = [field.strip() for field in fields.split(",")] + digest_data = dict(field.split("=") for field in response_fields) + + assert "qop" not in digest_data + assert "nc" not in digest_data + assert "cnonce" not in digest_data + assert digest_data["username"] == '"user"' + assert digest_data["realm"] == '"httpx@example.org"' + assert len(digest_data["nonce"]) == 64 + 2 # extra quotes + assert digest_data["uri"] == '"/"' + assert len(digest_data["response"]) == 64 + 2 + assert len(digest_data["opaque"]) == 64 + 2 + assert digest_data["algorithm"] == "SHA-256" + + +@pytest.mark.parametrize("qop", ("auth, auth-int", "auth,auth-int", "unknown,auth")) +def test_digest_auth_qop_including_spaces_and_auth_returns_auth(qop: str) -> None: + url = "https://example.org/" + auth = httpx.DigestAuth(username="user", password="password123") + app = DigestApp(qop=qop) + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url, auth=auth) + + assert response.status_code == 200 + assert len(response.history) == 1 + + +def test_digest_auth_qop_auth_int_not_implemented() -> None: + url = "https://example.org/" + auth = httpx.DigestAuth(username="user", password="password123") + app = DigestApp(qop="auth-int") + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + with pytest.raises(NotImplementedError): + client.get(url, auth=auth) + + +def test_digest_auth_qop_must_be_auth_or_auth_int() -> None: + url = "https://example.org/" + auth = httpx.DigestAuth(username="user", password="password123") + app = DigestApp(qop="not-auth") + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + with pytest.raises(httpx.ProtocolError): + client.get(url, auth=auth) + + +def test_digest_auth_incorrect_credentials() -> None: + url = "https://example.org/" + auth = httpx.DigestAuth(username="user", password="password123") + app = DigestApp(send_response_after_attempt=2) + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url, auth=auth) + + assert response.status_code == 401 + assert len(response.history) == 1 + + +def test_digest_auth_reuses_challenge() -> None: + url = "https://example.org/" + auth = httpx.DigestAuth(username="user", password="password123") + app = DigestApp() + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response_1 = client.get(url, auth=auth) + response_2 = client.get(url, auth=auth) + + assert response_1.status_code == 200 + assert response_2.status_code == 200 + + assert len(response_1.history) == 1 + assert len(response_2.history) == 0 + + +def test_digest_auth_resets_nonce_count_after_401() -> None: + url = "https://example.org/" + auth = httpx.DigestAuth(username="user", password="password123") + app = DigestApp() + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response_1 = client.get(url, auth=auth) + assert response_1.status_code == 200 + assert len(response_1.history) == 1 + + first_nonce = parse_keqv_list( + response_1.request.headers["Authorization"].split(", ") + )["nonce"] + first_nc = parse_keqv_list( + response_1.request.headers["Authorization"].split(", ") + )["nc"] + + # with this we now force a 401 on a subsequent (but initial) request + app.send_response_after_attempt = 2 + + # we expect the client again to try to authenticate, + # i.e. the history length must be 1 + response_2 = client.get(url, auth=auth) + assert response_2.status_code == 200 + assert len(response_2.history) == 1 + + second_nonce = parse_keqv_list( + response_2.request.headers["Authorization"].split(", ") + )["nonce"] + second_nc = parse_keqv_list( + response_2.request.headers["Authorization"].split(", ") + )["nc"] + + assert first_nonce != second_nonce # ensures that the auth challenge was reset + assert ( + first_nc == second_nc + ) # ensures the nonce count is reset when the authentication failed + + +@pytest.mark.parametrize( + "auth_header", + [ + 'Digest realm="httpx@example.org", qop="auth"', # missing fields + 'Digest realm="httpx@example.org", qop="auth,au', # malformed fields list + ], +) +def test_digest_auth_raises_protocol_error_on_malformed_header( + auth_header: str, +) -> None: + url = "https://example.org/" + auth = httpx.DigestAuth(username="user", password="password123") + app = App(auth_header=auth_header, status_code=401) + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + with pytest.raises(httpx.ProtocolError): + client.get(url, auth=auth) + + +def test_auth_history() -> None: + """ + Test that intermediate requests sent as part of an authentication flow + are recorded in the response history. + """ + url = "https://example.org/" + auth = RepeatAuth(repeat=2) + app = App(auth_header="abc") + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url, auth=auth) + + assert response.status_code == 200 + assert response.json() == {"auth": "Repeat abc.abc"} + + assert len(response.history) == 2 + resp1, resp2 = response.history + assert resp1.json() == {"auth": "Repeat 0"} + assert resp2.json() == {"auth": "Repeat 1"} + + assert len(resp2.history) == 1 + assert resp2.history == [resp1] + + assert len(resp1.history) == 0 + + +class ConsumeBodyTransport(httpx.MockTransport): + def handle_request(self, request: httpx.Request) -> httpx.Response: + assert isinstance(request.stream, httpx.SyncByteStream) + for _ in request.stream: + pass + return self.handler(request) # type: ignore[return-value] + + +def test_digest_auth_unavailable_streaming_body(): + url = "https://example.org/" + auth = httpx.DigestAuth(username="user", password="password123") + app = DigestApp() + + def streaming_body() -> typing.Iterator[bytes]: + yield b"Example request body" # pragma: no cover + + with httpx.Client(transport=ConsumeBodyTransport(app)) as client: + with pytest.raises(httpx.StreamConsumed): + client.post(url, content=streaming_body(), auth=auth) + + +def test_auth_reads_response_body() -> None: + """ + Test that we can read the response body in an auth flow if `requires_response_body` + is set. + """ + url = "https://example.org/" + auth = ResponseBodyAuth("xyz") + app = App() + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url, auth=auth) + + assert response.status_code == 200 + assert response.json() == {"auth": '{"auth":"xyz"}'} + + +def test_auth() -> None: + """ + Test that we can use an auth implementation specific to the async case, to + support cases that require performing I/O or using concurrency primitives (such + as checking a disk-based cache or fetching a token from a remote auth server). + """ + url = "https://example.org/" + auth = Auth() + app = App() + + with httpx.Client(transport=httpx.MockTransport(app)) as client: + response = client.get(url, auth=auth) + + assert response.status_code == 200 + assert response.json() == {"auth": "auth"} diff --git a/tests/client/sync/test_client.py b/tests/client/sync/test_client.py new file mode 100644 index 00000000..fc267ad1 --- /dev/null +++ b/tests/client/sync/test_client.py @@ -0,0 +1,339 @@ +from __future__ import annotations + +import typing +from datetime import timedelta + +import pytest + +import httpx + + +def test_get(server): + url = server.url + with httpx.Client(http2=True) as client: + response = client.get(url) + assert response.status_code == 200 + assert response.text == "Hello, world!" + assert response.http_version == "HTTP/1.1" + assert response.headers + assert repr(response) == "" + assert response.elapsed > timedelta(seconds=0) + + +@pytest.mark.parametrize( + "url", + [ + pytest.param("invalid://example.org", id="scheme-not-http(s)"), + pytest.param("://example.org", id="no-scheme"), + pytest.param("http://", id="no-host"), + ], +) +def test_get_invalid_url(server, url): + with httpx.Client() as client: + with pytest.raises((httpx.UnsupportedProtocol, httpx.LocalProtocolError)): + client.get(url) + + +def test_build_request(server): + url = server.url.copy_with(path="/echo_headers") + headers = {"Custom-header": "value"} + with httpx.Client() as client: + request = client.build_request("GET", url) + request.headers.update(headers) + response = client.send(request) + + assert response.status_code == 200 + assert response.url == url + + assert response.json()["Custom-header"] == "value" + + +def test_post(server): + url = server.url + with httpx.Client() as client: + response = client.post(url, content=b"Hello, world!") + assert response.status_code == 200 + + +def test_post_json(server): + url = server.url + with httpx.Client() as client: + response = client.post(url, json={"text": "Hello, world!"}) + assert response.status_code == 200 + + +def test_stream_response(server): + with httpx.Client() as client: + with client.stream("GET", server.url) as response: + body = response.read() + + assert response.status_code == 200 + assert body == b"Hello, world!" + assert response.content == b"Hello, world!" + + +def test_access_content_stream_response(server): + with httpx.Client() as client: + with client.stream("GET", server.url) as response: + pass + + assert response.status_code == 200 + with pytest.raises(httpx.ResponseNotRead): + response.content # noqa: B018 + + +def test_stream_request(server): + def hello_world() -> typing.Iterator[bytes]: + yield b"Hello, " + yield b"world!" + + with httpx.Client() as client: + response = client.post(server.url, content=hello_world()) + assert response.status_code == 200 + + +def test_raise_for_status(server): + with httpx.Client() as client: + for status_code in (200, 400, 404, 500, 505): + response = client.request( + "GET", server.url.copy_with(path=f"/status/{status_code}") + ) + + if 400 <= status_code < 600: + with pytest.raises(httpx.HTTPStatusError) as exc_info: + response.raise_for_status() + assert exc_info.value.response == response + else: + assert response.raise_for_status() is response + + +def test_options(server): + with httpx.Client() as client: + response = client.options(server.url) + assert response.status_code == 200 + assert response.text == "Hello, world!" + + +def test_head(server): + with httpx.Client() as client: + response = client.head(server.url) + assert response.status_code == 200 + assert response.text == "" + + +def test_put(server): + with httpx.Client() as client: + response = client.put(server.url, content=b"Hello, world!") + assert response.status_code == 200 + + +def test_patch(server): + with httpx.Client() as client: + response = client.patch(server.url, content=b"Hello, world!") + assert response.status_code == 200 + + +def test_delete(server): + with httpx.Client() as client: + response = client.delete(server.url) + assert response.status_code == 200 + assert response.text == "Hello, world!" + + +def test_100_continue(server): + headers = {"Expect": "100-continue"} + content = b"Echo request body" + + with httpx.Client() as client: + response = client.post( + server.url.copy_with(path="/echo_body"), headers=headers, content=content + ) + + assert response.status_code == 200 + assert response.content == content + + +def test_context_managed_transport(): + class Transport(httpx.BaseTransport): + def __init__(self) -> None: + self.events: list[str] = [] + + def close(self): + # The base implementation of httpx.BaseTransport just + # calls into `.close`, so simple transport cases can just override + # this method for any cleanup, where more complex cases + # might want to additionally override `__enter__`/`__exit__`. + self.events.append("transport.close") + + def __enter__(self): + super().__enter__() + self.events.append("transport.__enter__") + + def __exit__(self, *args): + super().__exit__(*args) + self.events.append("transport.__exit__") + + transport = Transport() + with httpx.Client(transport=transport): + pass + + assert transport.events == [ + "transport.__enter__", + "transport.close", + "transport.__exit__", + ] + + +def test_context_managed_transport_and_mount(): + class Transport(httpx.BaseTransport): + def __init__(self, name: str) -> None: + self.name: str = name + self.events: list[str] = [] + + def close(self): + # The base implementation of httpx.BaseTransport just + # calls into `.close`, so simple transport cases can just override + # this method for any cleanup, where more complex cases + # might want to additionally override `__enter__`/`__exit__`. + self.events.append(f"{self.name}.close") + + def __enter__(self): + super().__enter__() + self.events.append(f"{self.name}.__enter__") + + def __exit__(self, *args): + super().__exit__(*args) + self.events.append(f"{self.name}.__exit__") + + transport = Transport(name="transport") + mounted = Transport(name="mounted") + with httpx.Client(transport=transport, mounts={"http://www.example.org": mounted}): + pass + + assert transport.events == [ + "transport.__enter__", + "transport.close", + "transport.__exit__", + ] + assert mounted.events == [ + "mounted.__enter__", + "mounted.close", + "mounted.__exit__", + ] + + +def hello_world(request): + return httpx.Response(200, text="Hello, world!") + + +def test_client_closed_state_using_implicit_open(): + client = httpx.Client(transport=httpx.MockTransport(hello_world)) + + assert not client.is_closed + client.get("http://example.com") + + assert not client.is_closed + client.close() + + assert client.is_closed + # Once we're close we cannot make any more requests. + with pytest.raises(RuntimeError): + client.get("http://example.com") + + # Once we're closed we cannot reopen the client. + with pytest.raises(RuntimeError): + with client: + pass # pragma: no cover + + +def test_client_closed_state_using_with_block(): + with httpx.Client(transport=httpx.MockTransport(hello_world)) as client: + assert not client.is_closed + client.get("http://example.com") + + assert client.is_closed + with pytest.raises(RuntimeError): + client.get("http://example.com") + + +def unmounted(request: httpx.Request) -> httpx.Response: + data = {"app": "unmounted"} + return httpx.Response(200, json=data) + + +def mounted(request: httpx.Request) -> httpx.Response: + data = {"app": "mounted"} + return httpx.Response(200, json=data) + + +def test_mounted_transport(): + transport = httpx.MockTransport(unmounted) + mounts = {"custom://": httpx.MockTransport(mounted)} + + with httpx.Client(transport=transport, mounts=mounts) as client: + response = client.get("https://www.example.com") + assert response.status_code == 200 + assert response.json() == {"app": "unmounted"} + + response = client.get("custom://www.example.com") + assert response.status_code == 200 + assert response.json() == {"app": "mounted"} + + +def test_mock_transport(): + def hello_world(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, text="Hello, world!") + + transport = httpx.MockTransport(hello_world) + + with httpx.Client(transport=transport) as client: + response = client.get("https://www.example.com") + assert response.status_code == 200 + assert response.text == "Hello, world!" + + +def test_cancellation_during_stream(): + """ + If any BaseException is raised during streaming the response, then the + stream should be closed. + + This includes: + + * `asyncio.CancelledError` (A subclass of BaseException from Python 3.8 onwards.) + * `trio.Cancelled` + * `KeyboardInterrupt` + * `SystemExit` + + See https://github.com/encode/httpx/issues/2139 + """ + stream_was_closed = False + + def response_with_cancel_during_stream(request): + class CancelledStream(httpx.SyncByteStream): + def __iter__(self) -> typing.Iterator[bytes]: + yield b"Hello" + raise KeyboardInterrupt() + yield b", world" # pragma: no cover + + def close(self) -> None: + nonlocal stream_was_closed + stream_was_closed = True + + return httpx.Response( + 200, headers={"Content-Length": "12"}, stream=CancelledStream() + ) + + transport = httpx.MockTransport(response_with_cancel_during_stream) + + with httpx.Client(transport=transport) as client: + with pytest.raises(KeyboardInterrupt): + client.get("https://www.example.com") + assert stream_was_closed + + +def test_server_extensions(server): + url = server.url + with httpx.Client(http2=True) as client: + response = client.get(url) + assert response.status_code == 200 + assert response.extensions["http_version"] == b"HTTP/1.1" diff --git a/tests/client/sync/test_cookies.py b/tests/client/sync/test_cookies.py new file mode 100644 index 00000000..35452b3e --- /dev/null +++ b/tests/client/sync/test_cookies.py @@ -0,0 +1,167 @@ +from http.cookiejar import Cookie, CookieJar + +import pytest + +import httpx + + +def get_and_set_cookies(request: httpx.Request) -> httpx.Response: + if request.url.path == "/echo_cookies": + data = {"cookies": request.headers.get("cookie")} + return httpx.Response(200, json=data) + elif request.url.path == "/set_cookie": + return httpx.Response(200, headers={"set-cookie": "example-name=example-value"}) + else: + raise NotImplementedError() # pragma: no cover + + +def test_set_cookie() -> None: + """ + Send a request including a cookie. + """ + url = "http://example.org/echo_cookies" + cookies = {"example-name": "example-value"} + + with httpx.Client( + cookies=cookies, transport=httpx.MockTransport(get_and_set_cookies) + ) as client: + response = client.get(url) + + assert response.status_code == 200 + assert response.json() == {"cookies": "example-name=example-value"} + + +def test_set_per_request_cookie_is_deprecated() -> None: + """ + Sending a request including a per-request cookie is deprecated. + """ + url = "http://example.org/echo_cookies" + cookies = {"example-name": "example-value"} + + with httpx.Client(transport=httpx.MockTransport(get_and_set_cookies)) as client: + with pytest.warns(DeprecationWarning): + response = client.get(url, cookies=cookies) + + assert response.status_code == 200 + assert response.json() == {"cookies": "example-name=example-value"} + + +def test_set_cookie_with_cookiejar() -> None: + """ + Send a request including a cookie, using a `CookieJar` instance. + """ + + url = "http://example.org/echo_cookies" + cookies = CookieJar() + cookie = Cookie( + version=0, + name="example-name", + value="example-value", + port=None, + port_specified=False, + domain="", + domain_specified=False, + domain_initial_dot=False, + path="/", + path_specified=True, + secure=False, + expires=None, + discard=True, + comment=None, + comment_url=None, + rest={"HttpOnly": ""}, + rfc2109=False, + ) + cookies.set_cookie(cookie) + + with httpx.Client( + cookies=cookies, transport=httpx.MockTransport(get_and_set_cookies) + ) as client: + response = client.get(url) + + assert response.status_code == 200 + assert response.json() == {"cookies": "example-name=example-value"} + + +def test_setting_client_cookies_to_cookiejar() -> None: + """ + Send a request including a cookie, using a `CookieJar` instance. + """ + + url = "http://example.org/echo_cookies" + cookies = CookieJar() + cookie = Cookie( + version=0, + name="example-name", + value="example-value", + port=None, + port_specified=False, + domain="", + domain_specified=False, + domain_initial_dot=False, + path="/", + path_specified=True, + secure=False, + expires=None, + discard=True, + comment=None, + comment_url=None, + rest={"HttpOnly": ""}, + rfc2109=False, + ) + cookies.set_cookie(cookie) + + with httpx.Client( + cookies=cookies, transport=httpx.MockTransport(get_and_set_cookies) + ) as client: + response = client.get(url) + + assert response.status_code == 200 + assert response.json() == {"cookies": "example-name=example-value"} + + +def test_set_cookie_with_cookies_model() -> None: + """ + Send a request including a cookie, using a `Cookies` instance. + """ + + url = "http://example.org/echo_cookies" + cookies = httpx.Cookies() + cookies["example-name"] = "example-value" + + with httpx.Client(transport=httpx.MockTransport(get_and_set_cookies)) as client: + client.cookies = cookies + response = client.get(url) + + assert response.status_code == 200 + assert response.json() == {"cookies": "example-name=example-value"} + + +def test_get_cookie() -> None: + url = "http://example.org/set_cookie" + + with httpx.Client(transport=httpx.MockTransport(get_and_set_cookies)) as client: + response = client.get(url) + + assert response.status_code == 200 + assert response.cookies["example-name"] == "example-value" + assert client.cookies["example-name"] == "example-value" + + +def test_cookie_persistence() -> None: + """ + Ensure that Client instances persist cookies between requests. + """ + with httpx.Client(transport=httpx.MockTransport(get_and_set_cookies)) as client: + response = client.get("http://example.org/echo_cookies") + assert response.status_code == 200 + assert response.json() == {"cookies": None} + + response = client.get("http://example.org/set_cookie") + assert response.status_code == 200 + assert response.cookies["example-name"] == "example-value" + assert client.cookies["example-name"] == "example-value" + + response = client.get("http://example.org/echo_cookies") + assert response.status_code == 200 + assert response.json() == {"cookies": "example-name=example-value"} diff --git a/tests/client/sync/test_event_hooks.py b/tests/client/sync/test_event_hooks.py new file mode 100644 index 00000000..df96199e --- /dev/null +++ b/tests/client/sync/test_event_hooks.py @@ -0,0 +1,117 @@ +import httpx + + +def app(request: httpx.Request) -> httpx.Response: + if request.url.path == "/redirect": + return httpx.Response(303, headers={"server": "testserver", "location": "/"}) + elif request.url.path.startswith("/status/"): + status_code = int(request.url.path[-3:]) + return httpx.Response(status_code, headers={"server": "testserver"}) + + return httpx.Response(200, headers={"server": "testserver"}) + + +def test_event_hooks(): + events = [] + + def on_request(request): + events.append({"event": "request", "headers": dict(request.headers)}) + + def on_response(response): + events.append({"event": "response", "headers": dict(response.headers)}) + + event_hooks = {"request": [on_request], "response": [on_response]} + + with httpx.Client( + event_hooks=event_hooks, transport=httpx.MockTransport(app) + ) as http: + http.get("http://127.0.0.1:8000/", auth=("username", "password")) + + assert events == [ + { + "event": "request", + "headers": { + "host": "127.0.0.1:8000", + "user-agent": f"python-httpx/{httpx.__version__}", + "accept": "*/*", + "accept-encoding": "gzip, deflate, br, zstd", + "connection": "keep-alive", + "authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=", + }, + }, + { + "event": "response", + "headers": {"server": "testserver"}, + }, + ] + + +def test_event_hooks_raising_exception(): + def raise_on_4xx_5xx(response): + response.raise_for_status() + + event_hooks = {"response": [raise_on_4xx_5xx]} + + with httpx.Client( + event_hooks=event_hooks, transport=httpx.MockTransport(app) + ) as http: + try: + http.get("http://127.0.0.1:8000/status/400") + except httpx.HTTPStatusError as exc: + assert exc.response.is_closed + + +def test_event_hooks_with_redirect(): + """ + A redirect request should trigger additional 'request' and 'response' event hooks. + """ + + events = [] + + def on_request(request): + events.append({"event": "request", "headers": dict(request.headers)}) + + def on_response(response): + events.append({"event": "response", "headers": dict(response.headers)}) + + event_hooks = {"request": [on_request], "response": [on_response]} + + with httpx.Client( + event_hooks=event_hooks, + transport=httpx.MockTransport(app), + follow_redirects=True, + ) as http: + http.get("http://127.0.0.1:8000/redirect", auth=("username", "password")) + + assert events == [ + { + "event": "request", + "headers": { + "host": "127.0.0.1:8000", + "user-agent": f"python-httpx/{httpx.__version__}", + "accept": "*/*", + "accept-encoding": "gzip, deflate, br, zstd", + "connection": "keep-alive", + "authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=", + }, + }, + { + "event": "response", + "headers": {"location": "/", "server": "testserver"}, + }, + { + "event": "request", + "headers": { + "host": "127.0.0.1:8000", + "user-agent": f"python-httpx/{httpx.__version__}", + "accept": "*/*", + "accept-encoding": "gzip, deflate, br, zstd", + "connection": "keep-alive", + "authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=", + }, + }, + { + "event": "response", + "headers": {"server": "testserver"}, + }, + ] diff --git a/tests/client/sync/test_headers.py b/tests/client/sync/test_headers.py new file mode 100644 index 00000000..a9e296ad --- /dev/null +++ b/tests/client/sync/test_headers.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python3 + +import pytest + +import httpx + + +def echo_headers(request: httpx.Request) -> httpx.Response: + data = {"headers": dict(request.headers)} + return httpx.Response(200, json=data) + + +def echo_repeated_headers_multi_items(request: httpx.Request) -> httpx.Response: + data = {"headers": list(request.headers.multi_items())} + return httpx.Response(200, json=data) + + +def echo_repeated_headers_items(request: httpx.Request) -> httpx.Response: + data = {"headers": list(request.headers.items())} + return httpx.Response(200, json=data) + + +def test_client_header(): + """ + Set a header in the Client. + """ + url = "http://example.org/echo_headers" + headers = {"Example-Header": "example-value"} + + with httpx.Client( + transport=httpx.MockTransport(echo_headers), headers=headers + ) as client: + response = client.get(url) + + assert response.status_code == 200 + assert response.json() == { + "headers": { + "accept": "*/*", + "accept-encoding": "gzip, deflate, br, zstd", + "connection": "keep-alive", + "example-header": "example-value", + "host": "example.org", + "user-agent": f"python-httpx/{httpx.__version__}", + } + } + + +def test_header_merge(): + url = "http://example.org/echo_headers" + client_headers = {"User-Agent": "python-myclient/0.2.1"} + request_headers = {"X-Auth-Token": "FooBarBazToken"} + with httpx.Client( + transport=httpx.MockTransport(echo_headers), headers=client_headers + ) as client: + response = client.get(url, headers=request_headers) + + assert response.status_code == 200 + assert response.json() == { + "headers": { + "accept": "*/*", + "accept-encoding": "gzip, deflate, br, zstd", + "connection": "keep-alive", + "host": "example.org", + "user-agent": "python-myclient/0.2.1", + "x-auth-token": "FooBarBazToken", + } + } + + +def test_header_merge_conflicting_headers(): + url = "http://example.org/echo_headers" + client_headers = {"X-Auth-Token": "FooBar"} + request_headers = {"X-Auth-Token": "BazToken"} + with httpx.Client( + transport=httpx.MockTransport(echo_headers), headers=client_headers + ) as client: + response = client.get(url, headers=request_headers) + + assert response.status_code == 200 + assert response.json() == { + "headers": { + "accept": "*/*", + "accept-encoding": "gzip, deflate, br, zstd", + "connection": "keep-alive", + "host": "example.org", + "user-agent": f"python-httpx/{httpx.__version__}", + "x-auth-token": "BazToken", + } + } + + +def test_header_update(): + url = "http://example.org/echo_headers" + with httpx.Client(transport=httpx.MockTransport(echo_headers)) as client: + first_response = client.get(url) + client.headers.update( + {"User-Agent": "python-myclient/0.2.1", "Another-Header": "AThing"} + ) + second_response = client.get(url) + + assert first_response.status_code == 200 + assert first_response.json() == { + "headers": { + "accept": "*/*", + "accept-encoding": "gzip, deflate, br, zstd", + "connection": "keep-alive", + "host": "example.org", + "user-agent": f"python-httpx/{httpx.__version__}", + } + } + + assert second_response.status_code == 200 + assert second_response.json() == { + "headers": { + "accept": "*/*", + "accept-encoding": "gzip, deflate, br, zstd", + "another-header": "AThing", + "connection": "keep-alive", + "host": "example.org", + "user-agent": "python-myclient/0.2.1", + } + } + + +def test_header_repeated_items(): + url = "http://example.org/echo_headers" + with httpx.Client( + transport=httpx.MockTransport(echo_repeated_headers_items) + ) as client: + response = client.get(url, headers=[("x-header", "1"), ("x-header", "2,3")]) + + assert response.status_code == 200 + + echoed_headers = response.json()["headers"] + # as per RFC 7230, the whitespace after a comma is insignificant + # so we split and strip here so that we can do a safe comparison + assert ["x-header", ["1", "2", "3"]] in [ + [k, [subv.lstrip() for subv in v.split(",")]] for k, v in echoed_headers + ] + + +def test_header_repeated_multi_items(): + url = "http://example.org/echo_headers" + with httpx.Client( + transport=httpx.MockTransport(echo_repeated_headers_multi_items) + ) as client: + response = client.get(url, headers=[("x-header", "1"), ("x-header", "2,3")]) + + assert response.status_code == 200 + + echoed_headers = response.json()["headers"] + assert ["x-header", "1"] in echoed_headers + assert ["x-header", "2,3"] in echoed_headers + + +def test_remove_default_header(): + """ + Remove a default header from the Client. + """ + url = "http://example.org/echo_headers" + + with httpx.Client(transport=httpx.MockTransport(echo_headers)) as client: + del client.headers["User-Agent"] + + response = client.get(url) + + assert response.status_code == 200 + assert response.json() == { + "headers": { + "accept": "*/*", + "accept-encoding": "gzip, deflate, br, zstd", + "connection": "keep-alive", + "host": "example.org", + } + } + + +def test_header_does_not_exist(): + headers = httpx.Headers({"foo": "bar"}) + with pytest.raises(KeyError): + del headers["baz"] + + +def test_header_with_incorrect_value(): + with pytest.raises( + TypeError, + match=f"Header value must be str or bytes, not {type(None)}", + ): + httpx.Headers({"foo": None}) # type: ignore + + +def test_host_with_auth_and_port_in_url(): + """ + The Host header should only include the hostname, or hostname:port + (for non-default ports only). Any userinfo or default port should not + be present. + """ + url = "http://username:password@example.org:80/echo_headers" + + with httpx.Client(transport=httpx.MockTransport(echo_headers)) as client: + response = client.get(url) + + assert response.status_code == 200 + assert response.json() == { + "headers": { + "accept": "*/*", + "accept-encoding": "gzip, deflate, br, zstd", + "connection": "keep-alive", + "host": "example.org", + "user-agent": f"python-httpx/{httpx.__version__}", + "authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=", + } + } + + +def test_host_with_non_default_port_in_url(): + """ + If the URL includes a non-default port, then it should be included in + the Host header. + """ + url = "http://username:password@example.org:123/echo_headers" + + with httpx.Client(transport=httpx.MockTransport(echo_headers)) as client: + response = client.get(url) + + assert response.status_code == 200 + assert response.json() == { + "headers": { + "accept": "*/*", + "accept-encoding": "gzip, deflate, br, zstd", + "connection": "keep-alive", + "host": "example.org:123", + "user-agent": f"python-httpx/{httpx.__version__}", + "authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=", + } + } + + +def test_request_auto_headers(): + request = httpx.Request("GET", "https://www.example.org/") + assert "host" in request.headers + + +def test_same_origin(): + origin = httpx.URL("https://example.com") + request = httpx.Request("GET", "HTTPS://EXAMPLE.COM:443") + + with httpx.Client() as client: + headers = client._redirect_headers(request, origin, "GET") + + assert headers["Host"] == request.url.netloc.decode("ascii") + + +def test_not_same_origin(): + origin = httpx.URL("https://example.com") + request = httpx.Request("GET", "HTTP://EXAMPLE.COM:80") + + with httpx.Client() as client: + headers = client._redirect_headers(request, origin, "GET") + + assert headers["Host"] == origin.netloc.decode("ascii") + + +def test_is_https_redirect(): + url = httpx.URL("https://example.com") + request = httpx.Request( + "GET", "http://example.com", headers={"Authorization": "empty"} + ) + + with httpx.Client() as client: + headers = client._redirect_headers(request, url, "GET") + + assert "Authorization" in headers + + +def test_is_not_https_redirect(): + url = httpx.URL("https://www.example.com") + request = httpx.Request( + "GET", "http://example.com", headers={"Authorization": "empty"} + ) + + with httpx.Client() as client: + headers = client._redirect_headers(request, url, "GET") + + assert "Authorization" not in headers + + +def test_is_not_https_redirect_if_not_default_ports(): + url = httpx.URL("https://example.com:1337") + request = httpx.Request( + "GET", "http://example.com:9999", headers={"Authorization": "empty"} + ) + + with httpx.Client() as client: + headers = client._redirect_headers(request, url, "GET") + + assert "Authorization" not in headers diff --git a/tests/client/sync/test_properties.py b/tests/client/sync/test_properties.py new file mode 100644 index 00000000..83f85f0e --- /dev/null +++ b/tests/client/sync/test_properties.py @@ -0,0 +1,67 @@ +import httpx + + +def test_client_base_url(): + with httpx.Client() as client: + client.base_url = "https://www.example.org/" # type: ignore + assert isinstance(client.base_url, httpx.URL) + assert client.base_url == "https://www.example.org/" + + +def test_client_base_url_without_trailing_slash(): + with httpx.Client() as client: + client.base_url = "https://www.example.org/path" # type: ignore + assert isinstance(client.base_url, httpx.URL) + assert client.base_url == "https://www.example.org/path/" + + +def test_client_base_url_with_trailing_slash(): + client = httpx.Client() + client.base_url = "https://www.example.org/path/" # type: ignore + assert isinstance(client.base_url, httpx.URL) + assert client.base_url == "https://www.example.org/path/" + + +def test_client_headers(): + with httpx.Client() as client: + client.headers = {"a": "b"} # type: ignore + assert isinstance(client.headers, httpx.Headers) + assert client.headers["A"] == "b" + + +def test_client_cookies(): + with httpx.Client() as client: + client.cookies = {"a": "b"} # type: ignore + assert isinstance(client.cookies, httpx.Cookies) + mycookies = list(client.cookies.jar) + assert len(mycookies) == 1 + assert mycookies[0].name == "a" and mycookies[0].value == "b" + + +def test_client_timeout(): + expected_timeout = 12.0 + with httpx.Client() as client: + client.timeout = expected_timeout # type: ignore + + assert isinstance(client.timeout, httpx.Timeout) + assert client.timeout.connect == expected_timeout + assert client.timeout.read == expected_timeout + assert client.timeout.write == expected_timeout + assert client.timeout.pool == expected_timeout + + +def test_client_event_hooks(): + def on_request(request): + pass # pragma: no cover + + with httpx.Client() as client: + client.event_hooks = {"request": [on_request]} + assert client.event_hooks == {"request": [on_request], "response": []} + + +def test_client_trust_env(): + with httpx.Client() as client: + assert client.trust_env + + with httpx.Client(trust_env=False) as client: + assert not client.trust_env diff --git a/tests/client/sync/test_proxies.py b/tests/client/sync/test_proxies.py new file mode 100644 index 00000000..2aa5d92b --- /dev/null +++ b/tests/client/sync/test_proxies.py @@ -0,0 +1,247 @@ +import httpcore +import pytest + +import httpx + + +def url_to_origin(url: str) -> httpcore.URL: + """ + Given a URL string, return the origin in the raw tuple format that + `httpcore` uses for it's representation. + """ + u = httpx.URL(url) + return httpcore.URL(scheme=u.raw_scheme, host=u.raw_host, port=u.port, target="/") + + +def test_socks_proxy(): + url = httpx.URL("http://www.example.com") + + for proxy in ("socks5://localhost/", "socks5h://localhost/"): + with httpx.Client(proxy=proxy) as client: + transport = client._transport_for_url(url) + assert isinstance(transport, httpx.HTTPTransport) + assert isinstance(transport._pool, httpcore.SOCKSProxy) + + +PROXY_URL = "http://[::1]" + + +@pytest.mark.parametrize( + ["url", "proxies", "expected"], + [ + ("http://example.com", {}, None), + ("http://example.com", {"https://": PROXY_URL}, None), + ("http://example.com", {"http://example.net": PROXY_URL}, None), + # Using "*" should match any domain name. + ("http://example.com", {"http://*": PROXY_URL}, PROXY_URL), + ("https://example.com", {"http://*": PROXY_URL}, None), + # Using "example.com" should match example.com, but not www.example.com + ("http://example.com", {"http://example.com": PROXY_URL}, PROXY_URL), + ("http://www.example.com", {"http://example.com": PROXY_URL}, None), + # Using "*.example.com" should match www.example.com, but not example.com + ("http://example.com", {"http://*.example.com": PROXY_URL}, None), + ("http://www.example.com", {"http://*.example.com": PROXY_URL}, PROXY_URL), + # Using "*example.com" should match example.com and www.example.com + ("http://example.com", {"http://*example.com": PROXY_URL}, PROXY_URL), + ("http://www.example.com", {"http://*example.com": PROXY_URL}, PROXY_URL), + ("http://wwwexample.com", {"http://*example.com": PROXY_URL}, None), + # ... + ("http://example.com:443", {"http://example.com": PROXY_URL}, PROXY_URL), + ("http://example.com", {"all://": PROXY_URL}, PROXY_URL), + ("http://example.com", {"http://": PROXY_URL}, PROXY_URL), + ("http://example.com", {"all://example.com": PROXY_URL}, PROXY_URL), + ("http://example.com", {"http://example.com": PROXY_URL}, PROXY_URL), + ("http://example.com", {"http://example.com:80": PROXY_URL}, PROXY_URL), + ("http://example.com:8080", {"http://example.com:8080": PROXY_URL}, PROXY_URL), + ("http://example.com:8080", {"http://example.com": PROXY_URL}, PROXY_URL), + ( + "http://example.com", + { + "all://": PROXY_URL + ":1", + "http://": PROXY_URL + ":2", + "all://example.com": PROXY_URL + ":3", + "http://example.com": PROXY_URL + ":4", + }, + PROXY_URL + ":4", + ), + ( + "http://example.com", + { + "all://": PROXY_URL + ":1", + "http://": PROXY_URL + ":2", + "all://example.com": PROXY_URL + ":3", + }, + PROXY_URL + ":3", + ), + ( + "http://example.com", + {"all://": PROXY_URL + ":1", "http://": PROXY_URL + ":2"}, + PROXY_URL + ":2", + ), + ], +) +def test_transport_for_request(url, proxies, expected): + mounts = {key: httpx.HTTPTransport(proxy=value) for key, value in proxies.items()} + with httpx.Client(mounts=mounts) as client: + transport = client._transport_for_url(httpx.URL(url)) + + if expected is None: + assert transport is client._transport + else: + assert isinstance(transport, httpx.HTTPTransport) + assert isinstance(transport._pool, httpcore.HTTPProxy) + assert transport._pool._proxy_url == url_to_origin(expected) + + +@pytest.mark.network +def test_proxy_close(): + try: + transport = httpx.HTTPTransport(proxy=PROXY_URL) + client = httpx.Client(mounts={"https://": transport}) + client.get("http://example.com") + finally: + client.close() + + +def test_unsupported_proxy_scheme(): + with pytest.raises(ValueError): + httpx.Client(proxy="ftp://127.0.0.1") + + +@pytest.mark.parametrize( + ["url", "env", "expected"], + [ + ("http://google.com", {}, None), + ( + "http://google.com", + {"HTTP_PROXY": "http://example.com"}, + "http://example.com", + ), + # Auto prepend http scheme + ("http://google.com", {"HTTP_PROXY": "example.com"}, "http://example.com"), + ( + "http://google.com", + {"HTTP_PROXY": "http://example.com", "NO_PROXY": "google.com"}, + None, + ), + # Everything proxied when NO_PROXY is empty/unset + ( + "http://127.0.0.1", + {"ALL_PROXY": "http://localhost:123", "NO_PROXY": ""}, + "http://localhost:123", + ), + # Not proxied if NO_PROXY matches URL. + ( + "http://127.0.0.1", + {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "127.0.0.1"}, + None, + ), + # Proxied if NO_PROXY scheme does not match URL. + ( + "http://127.0.0.1", + {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "https://127.0.0.1"}, + "http://localhost:123", + ), + # Proxied if NO_PROXY scheme does not match host. + ( + "http://127.0.0.1", + {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "1.1.1.1"}, + "http://localhost:123", + ), + # Not proxied if NO_PROXY matches host domain suffix. + ( + "http://courses.mit.edu", + {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu"}, + None, + ), + # Proxied even though NO_PROXY matches host domain *prefix*. + ( + "https://mit.edu.info", + {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu"}, + "http://localhost:123", + ), + # Not proxied if one item in NO_PROXY case matches host domain suffix. + ( + "https://mit.edu.info", + {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu,edu.info"}, + None, + ), + # Not proxied if one item in NO_PROXY case matches host domain suffix. + # May include whitespace. + ( + "https://mit.edu.info", + {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu, edu.info"}, + None, + ), + # Proxied if no items in NO_PROXY match. + ( + "https://mit.edu.info", + {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu,mit.info"}, + "http://localhost:123", + ), + # Proxied if NO_PROXY domain doesn't match. + ( + "https://foo.example.com", + {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "www.example.com"}, + "http://localhost:123", + ), + # Not proxied for subdomains matching NO_PROXY, with a leading ".". + ( + "https://www.example1.com", + {"ALL_PROXY": "http://localhost:123", "NO_PROXY": ".example1.com"}, + None, + ), + # Proxied, because NO_PROXY subdomains only match if "." separated. + ( + "https://www.example2.com", + {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "ample2.com"}, + "http://localhost:123", + ), + # No requests are proxied if NO_PROXY="*" is set. + ( + "https://www.example3.com", + {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "*"}, + None, + ), + ], +) +def test_proxies_environ(monkeypatch, url, env, expected): + for name, value in env.items(): + monkeypatch.setenv(name, value) + + with httpx.Client() as client: + transport = client._transport_for_url(httpx.URL(url)) + + if expected is None: + assert transport == client._transport + else: + assert transport._pool._proxy_url == url_to_origin(expected) # type: ignore + + +@pytest.mark.parametrize( + ["proxies", "is_valid"], + [ + ({"http": "http://127.0.0.1"}, False), + ({"https": "http://127.0.0.1"}, False), + ({"all": "http://127.0.0.1"}, False), + ({"http://": "http://127.0.0.1"}, True), + ({"https://": "http://127.0.0.1"}, True), + ({"all://": "http://127.0.0.1"}, True), + ], +) +def test_for_deprecated_proxy_params(proxies, is_valid): + mounts = {key: httpx.HTTPTransport(proxy=value) for key, value in proxies.items()} + + if not is_valid: + with pytest.raises(ValueError): + httpx.Client(mounts=mounts) + else: + httpx.Client(mounts=mounts) + + +def test_proxy_with_mounts(): + proxy_transport = httpx.HTTPTransport(proxy="http://127.0.0.1") + + with httpx.Client(mounts={"http://": proxy_transport}) as client: + transport = client._transport_for_url(httpx.URL("http://example.com")) + assert transport == proxy_transport diff --git a/tests/client/sync/test_queryparams.py b/tests/client/sync/test_queryparams.py new file mode 100644 index 00000000..0ecdbaba --- /dev/null +++ b/tests/client/sync/test_queryparams.py @@ -0,0 +1,37 @@ +import httpx + + +def hello_world(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, text="Hello, world") + + +def test_client_queryparams(): + client = httpx.Client(params={"a": "b"}) + assert isinstance(client.params, httpx.QueryParams) + assert client.params["a"] == "b" + + +def test_client_queryparams_string(): + with httpx.Client(params="a=b") as client: + assert isinstance(client.params, httpx.QueryParams) + assert client.params["a"] == "b" + + with httpx.Client() as client: + client.params = "a=b" # type: ignore + assert isinstance(client.params, httpx.QueryParams) + assert client.params["a"] == "b" + + +def test_client_queryparams_echo(): + url = "http://example.org/echo_queryparams" + client_queryparams = "first=str" + request_queryparams = {"second": "dict"} + with httpx.Client( + transport=httpx.MockTransport(hello_world), params=client_queryparams + ) as client: + response = client.get(url, params=request_queryparams) + + assert response.status_code == 200 + assert ( + response.url == "http://example.org/echo_queryparams?first=str&second=dict" + ) diff --git a/tests/client/sync/test_redirects.py b/tests/client/sync/test_redirects.py new file mode 100644 index 00000000..c24c9a8b --- /dev/null +++ b/tests/client/sync/test_redirects.py @@ -0,0 +1,426 @@ +import typing + +import pytest + +import httpx + + +def redirects(request: httpx.Request) -> httpx.Response: + if request.url.scheme not in ("http", "https"): + raise httpx.UnsupportedProtocol(f"Scheme {request.url.scheme!r} not supported.") + + if request.url.path == "/redirect_301": + status_code = httpx.codes.MOVED_PERMANENTLY + content = b"here" + headers = {"location": "https://example.org/"} + return httpx.Response(status_code, headers=headers, content=content) + + elif request.url.path == "/redirect_302": + status_code = httpx.codes.FOUND + headers = {"location": "https://example.org/"} + return httpx.Response(status_code, headers=headers) + + elif request.url.path == "/redirect_303": + status_code = httpx.codes.SEE_OTHER + headers = {"location": "https://example.org/"} + return httpx.Response(status_code, headers=headers) + + elif request.url.path == "/relative_redirect": + status_code = httpx.codes.SEE_OTHER + headers = {"location": "/"} + return httpx.Response(status_code, headers=headers) + + elif request.url.path == "/malformed_redirect": + status_code = httpx.codes.SEE_OTHER + headers = {"location": "https://:443/"} + return httpx.Response(status_code, headers=headers) + + elif request.url.path == "/invalid_redirect": + status_code = httpx.codes.SEE_OTHER + raw_headers = [(b"location", "https://😇/".encode("utf-8"))] + return httpx.Response(status_code, headers=raw_headers) + + elif request.url.path == "/no_scheme_redirect": + status_code = httpx.codes.SEE_OTHER + headers = {"location": "//example.org/"} + return httpx.Response(status_code, headers=headers) + + elif request.url.path == "/multiple_redirects": + params = httpx.QueryParams(request.url.query) + count = int(params.get("count", "0")) + redirect_count = count - 1 + status_code = httpx.codes.SEE_OTHER if count else httpx.codes.OK + if count: + location = "/multiple_redirects" + if redirect_count: + location += f"?count={redirect_count}" + headers = {"location": location} + else: + headers = {} + return httpx.Response(status_code, headers=headers) + + if request.url.path == "/redirect_loop": + status_code = httpx.codes.SEE_OTHER + headers = {"location": "/redirect_loop"} + return httpx.Response(status_code, headers=headers) + + elif request.url.path == "/cross_domain": + status_code = httpx.codes.SEE_OTHER + headers = {"location": "https://example.org/cross_domain_target"} + return httpx.Response(status_code, headers=headers) + + elif request.url.path == "/cross_domain_target": + status_code = httpx.codes.OK + data = { + "body": request.content.decode("ascii"), + "headers": dict(request.headers), + } + return httpx.Response(status_code, json=data) + + elif request.url.path == "/redirect_body": + status_code = httpx.codes.PERMANENT_REDIRECT + headers = {"location": "/redirect_body_target"} + return httpx.Response(status_code, headers=headers) + + elif request.url.path == "/redirect_no_body": + status_code = httpx.codes.SEE_OTHER + headers = {"location": "/redirect_body_target"} + return httpx.Response(status_code, headers=headers) + + elif request.url.path == "/redirect_body_target": + data = { + "body": request.content.decode("ascii"), + "headers": dict(request.headers), + } + return httpx.Response(200, json=data) + + elif request.url.path == "/cross_subdomain": + if request.headers["Host"] != "www.example.org": + status_code = httpx.codes.PERMANENT_REDIRECT + headers = {"location": "https://www.example.org/cross_subdomain"} + return httpx.Response(status_code, headers=headers) + else: + return httpx.Response(200, text="Hello, world!") + + elif request.url.path == "/redirect_custom_scheme": + status_code = httpx.codes.MOVED_PERMANENTLY + headers = {"location": "market://details?id=42"} + return httpx.Response(status_code, headers=headers) + + if request.method == "HEAD": + return httpx.Response(200) + + return httpx.Response(200, html="Hello, world!") + + +def test_redirect_301(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + response = client.post( + "https://example.org/redirect_301", follow_redirects=True + ) + assert response.status_code == httpx.codes.OK + assert response.url == "https://example.org/" + assert len(response.history) == 1 + + +def test_redirect_302(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + response = client.post( + "https://example.org/redirect_302", follow_redirects=True + ) + assert response.status_code == httpx.codes.OK + assert response.url == "https://example.org/" + assert len(response.history) == 1 + + +def test_redirect_303(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + response = client.get("https://example.org/redirect_303", follow_redirects=True) + assert response.status_code == httpx.codes.OK + assert response.url == "https://example.org/" + assert len(response.history) == 1 + + +def test_next_request(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + request = client.build_request("POST", "https://example.org/redirect_303") + response = client.send(request, follow_redirects=False) + assert response.status_code == httpx.codes.SEE_OTHER + assert response.url == "https://example.org/redirect_303" + assert response.next_request is not None + + response = client.send(response.next_request, follow_redirects=False) + assert response.status_code == httpx.codes.OK + assert response.url == "https://example.org/" + assert response.next_request is None + + +def test_head_redirect(): + """ + Contrary to Requests, redirects remain enabled by default for HEAD requests. + """ + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + response = client.head( + "https://example.org/redirect_302", follow_redirects=True + ) + assert response.status_code == httpx.codes.OK + assert response.url == "https://example.org/" + assert response.request.method == "HEAD" + assert len(response.history) == 1 + assert response.text == "" + + +def test_relative_redirect(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + response = client.get( + "https://example.org/relative_redirect", follow_redirects=True + ) + assert response.status_code == httpx.codes.OK + assert response.url == "https://example.org/" + assert len(response.history) == 1 + + +def test_malformed_redirect(): + # https://github.com/encode/httpx/issues/771 + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + response = client.get( + "http://example.org/malformed_redirect", follow_redirects=True + ) + assert response.status_code == httpx.codes.OK + assert response.url == "https://example.org:443/" + assert len(response.history) == 1 + + +def test_no_scheme_redirect(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + response = client.get( + "https://example.org/no_scheme_redirect", follow_redirects=True + ) + assert response.status_code == httpx.codes.OK + assert response.url == "https://example.org/" + assert len(response.history) == 1 + + +def test_fragment_redirect(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + response = client.get( + "https://example.org/relative_redirect#fragment", follow_redirects=True + ) + assert response.status_code == httpx.codes.OK + assert response.url == "https://example.org/#fragment" + assert len(response.history) == 1 + + +def test_multiple_redirects(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + response = client.get( + "https://example.org/multiple_redirects?count=20", follow_redirects=True + ) + assert response.status_code == httpx.codes.OK + assert response.url == "https://example.org/multiple_redirects" + assert len(response.history) == 20 + assert ( + response.history[0].url == "https://example.org/multiple_redirects?count=20" + ) + assert ( + response.history[1].url == "https://example.org/multiple_redirects?count=19" + ) + assert len(response.history[0].history) == 0 + assert len(response.history[1].history) == 1 + + +def test_too_many_redirects(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + with pytest.raises(httpx.TooManyRedirects): + client.get( + "https://example.org/multiple_redirects?count=21", follow_redirects=True + ) + + +def test_redirect_loop(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + with pytest.raises(httpx.TooManyRedirects): + client.get("https://example.org/redirect_loop", follow_redirects=True) + + +def test_cross_domain_redirect_with_auth_header(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + url = "https://example.com/cross_domain" + headers = {"Authorization": "abc"} + response = client.get(url, headers=headers, follow_redirects=True) + assert response.url == "https://example.org/cross_domain_target" + assert "authorization" not in response.json()["headers"] + + +def test_cross_domain_https_redirect_with_auth_header(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + url = "http://example.com/cross_domain" + headers = {"Authorization": "abc"} + response = client.get(url, headers=headers, follow_redirects=True) + assert response.url == "https://example.org/cross_domain_target" + assert "authorization" not in response.json()["headers"] + + +def test_cross_domain_redirect_with_auth(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + url = "https://example.com/cross_domain" + response = client.get(url, auth=("user", "pass"), follow_redirects=True) + assert response.url == "https://example.org/cross_domain_target" + assert "authorization" not in response.json()["headers"] + + +def test_same_domain_redirect(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + url = "https://example.org/cross_domain" + headers = {"Authorization": "abc"} + response = client.get(url, headers=headers, follow_redirects=True) + assert response.url == "https://example.org/cross_domain_target" + assert response.json()["headers"]["authorization"] == "abc" + + +def test_same_domain_https_redirect_with_auth_header(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + url = "http://example.org/cross_domain" + headers = {"Authorization": "abc"} + response = client.get(url, headers=headers, follow_redirects=True) + assert response.url == "https://example.org/cross_domain_target" + assert response.json()["headers"]["authorization"] == "abc" + + +def test_body_redirect(): + """ + A 308 redirect should preserve the request body. + """ + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + url = "https://example.org/redirect_body" + content = b"Example request body" + response = client.post(url, content=content, follow_redirects=True) + assert response.url == "https://example.org/redirect_body_target" + assert response.json()["body"] == "Example request body" + assert "content-length" in response.json()["headers"] + + +def test_no_body_redirect(): + """ + A 303 redirect should remove the request body. + """ + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + url = "https://example.org/redirect_no_body" + content = b"Example request body" + response = client.post(url, content=content, follow_redirects=True) + assert response.url == "https://example.org/redirect_body_target" + assert response.json()["body"] == "" + assert "content-length" not in response.json()["headers"] + + +def test_can_stream_if_no_redirect(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + url = "https://example.org/redirect_301" + with client.stream("GET", url, follow_redirects=False) as response: + pass + assert response.status_code == httpx.codes.MOVED_PERMANENTLY + assert response.headers["location"] == "https://example.org/" + + +class ConsumeBodyTransport(httpx.MockTransport): + def handle_request(self, request: httpx.Request) -> httpx.Response: + assert isinstance(request.stream, httpx.SyncByteStream) + for _ in request.stream: + pass + return self.handler(request) # type: ignore[return-value] + + +def test_cannot_redirect_streaming_body(): + with httpx.Client(transport=ConsumeBodyTransport(redirects)) as client: + url = "https://example.org/redirect_body" + + def streaming_body() -> typing.Iterator[bytes]: + yield b"Example request body" # pragma: no cover + + with pytest.raises(httpx.StreamConsumed): + client.post(url, content=streaming_body(), follow_redirects=True) + + +def test_cross_subdomain_redirect(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + url = "https://example.com/cross_subdomain" + response = client.get(url, follow_redirects=True) + assert response.url == "https://www.example.org/cross_subdomain" + + +def cookie_sessions(request: httpx.Request) -> httpx.Response: + if request.url.path == "/": + cookie = request.headers.get("Cookie") + if cookie is not None: + content = b"Logged in" + else: + content = b"Not logged in" + return httpx.Response(200, content=content) + + elif request.url.path == "/login": + status_code = httpx.codes.SEE_OTHER + headers = { + "location": "/", + "set-cookie": ( + "session=eyJ1c2VybmFtZSI6ICJ0b21; path=/; Max-Age=1209600; " + "httponly; samesite=lax" + ), + } + return httpx.Response(status_code, headers=headers) + + else: + assert request.url.path == "/logout" + status_code = httpx.codes.SEE_OTHER + headers = { + "location": "/", + "set-cookie": ( + "session=null; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT; " + "httponly; samesite=lax" + ), + } + return httpx.Response(status_code, headers=headers) + + +def test_redirect_cookie_behavior(): + with httpx.Client( + transport=httpx.MockTransport(cookie_sessions), follow_redirects=True + ) as client: + # The client is not logged in. + response = client.get("https://example.com/") + assert response.url == "https://example.com/" + assert response.text == "Not logged in" + + # Login redirects to the homepage, setting a session cookie. + response = client.post("https://example.com/login") + assert response.url == "https://example.com/" + assert response.text == "Logged in" + + # The client is logged in. + response = client.get("https://example.com/") + assert response.url == "https://example.com/" + assert response.text == "Logged in" + + # Logout redirects to the homepage, expiring the session cookie. + response = client.post("https://example.com/logout") + assert response.url == "https://example.com/" + assert response.text == "Not logged in" + + # The client is not logged in. + response = client.get("https://example.com/") + assert response.url == "https://example.com/" + assert response.text == "Not logged in" + + +def test_redirect_custom_scheme(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + with pytest.raises(httpx.UnsupportedProtocol) as e: + client.post( + "https://example.org/redirect_custom_scheme", follow_redirects=True + ) + assert str(e.value) == "Scheme 'market' not supported." + + +def test_invalid_redirect(): + with httpx.Client(transport=httpx.MockTransport(redirects)) as client: + with pytest.raises(httpx.RemoteProtocolError): + client.get("http://example.org/invalid_redirect", follow_redirects=True)