]> git.ipfire.org Git - thirdparty/httpx.git/commitdiff
Use consistent import style (#2493)
authorTom Christie <tom@tomchristie.com>
Tue, 6 Dec 2022 11:58:30 +0000 (11:58 +0000)
committerGitHub <noreply@github.com>
Tue, 6 Dec 2022 11:58:30 +0000 (11:58 +0000)
tests/client/test_auth.py

index 1f149a94e54c9bbf893162a889d7b356b95ba657..e889d8ef489952921c7c87f35d324e5accce28a3 100644 (file)
@@ -13,7 +13,6 @@ from urllib.request import parse_keqv_list
 import pytest
 
 import httpx
-from httpx import URL, Auth, BasicAuth, DigestAuth, ProtocolError, Request, Response
 
 from ..common import FIXTURES_DIR
 
@@ -75,10 +74,10 @@ class DigestApp:
         headers = {
             "www-authenticate": f'Digest realm="httpx@example.org", {challenge_str}',
         }
-        return Response(401, headers=headers)
+        return httpx.Response(401, headers=headers)
 
 
-class RepeatAuth(Auth):
+class RepeatAuth(httpx.Auth):
     """
     A mock authentication scheme that requires clients to send
     the request a fixed number of times, and then send a last request containing
@@ -91,7 +90,9 @@ class RepeatAuth(Auth):
     def __init__(self, repeat: int):
         self.repeat = repeat
 
-    def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
+    def auth_flow(
+        self, request: httpx.Request
+    ) -> typing.Generator[httpx.Request, httpx.Response, None]:
         nonces = []
 
         for index in range(self.repeat):
@@ -104,7 +105,7 @@ class RepeatAuth(Auth):
         yield request
 
 
-class ResponseBodyAuth(Auth):
+class ResponseBodyAuth(httpx.Auth):
     """
     A mock authentication scheme that requires clients to send an 'Authorization'
     header, then send back the contents of the response in the 'Authorization'
@@ -116,7 +117,9 @@ class ResponseBodyAuth(Auth):
     def __init__(self, token: str) -> None:
         self.token = token
 
-    def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
+    def auth_flow(
+        self, request: httpx.Request
+    ) -> typing.Generator[httpx.Request, httpx.Response, None]:
         request.headers["Authorization"] = self.token
         response = yield request
         data = response.text
@@ -124,7 +127,7 @@ class ResponseBodyAuth(Auth):
         yield request
 
 
-class SyncOrAsyncAuth(Auth):
+class SyncOrAsyncAuth(httpx.Auth):
     """
     A mock authentication scheme that uses a different implementation for the
     sync and async cases.
@@ -135,15 +138,15 @@ class SyncOrAsyncAuth(Auth):
         self._async_lock = asyncio.Lock()
 
     def sync_auth_flow(
-        self, request: Request
-    ) -> typing.Generator[Request, Response, None]:
+        self, request: httpx.Request
+    ) -> typing.Generator[httpx.Request, httpx.Response, None]:
         with self._lock:
             request.headers["Authorization"] = "sync-auth"
         yield request
 
     async def async_auth_flow(
-        self, request: Request
-    ) -> typing.AsyncGenerator[Request, Response]:
+        self, request: httpx.Request
+    ) -> typing.AsyncGenerator[httpx.Request, httpx.Response]:
         async with self._async_lock:
             request.headers["Authorization"] = "async-auth"
         yield request
@@ -213,7 +216,7 @@ async def test_custom_auth() -> None:
     url = "https://example.org/"
     app = App()
 
-    def auth(request: Request) -> Request:
+    def auth(request: httpx.Request) -> httpx.Request:
         request.headers["Authorization"] = "Token 123"
         return request
 
@@ -295,8 +298,8 @@ async def test_auth_disable_per_request() -> None:
 def test_auth_hidden_url() -> None:
     url = "http://example-username:example-password@example.org/"
     expected = "URL('http://example-username:[secure]@example.org/')"
-    assert url == URL(url)
-    assert expected == repr(URL(url))
+    assert url == httpx.URL(url)
+    assert expected == repr(httpx.URL(url))
 
 
 @pytest.mark.asyncio
@@ -319,7 +322,7 @@ async def test_auth_property() -> None:
         assert client.auth is None
 
         client.auth = ("user", "password123")  # type: ignore
-        assert isinstance(client.auth, BasicAuth)
+        assert isinstance(client.auth, httpx.BasicAuth)
 
         url = "https://example.org/"
         response = await client.get(url)
@@ -348,7 +351,7 @@ async def test_auth_invalid_type() -> None:
 @pytest.mark.asyncio
 async def test_digest_auth_returns_no_auth_if_no_digest_header_in_response() -> None:
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     app = App()
 
     async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
@@ -361,7 +364,7 @@ async def test_digest_auth_returns_no_auth_if_no_digest_header_in_response() ->
 
 def test_digest_auth_returns_no_auth_if_alternate_auth_scheme() -> None:
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     auth_header = "Token ..."
     app = App(auth_header=auth_header, status_code=401)
 
@@ -376,7 +379,7 @@ def test_digest_auth_returns_no_auth_if_alternate_auth_scheme() -> None:
 @pytest.mark.asyncio
 async def test_digest_auth_200_response_including_digest_auth_header() -> None:
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     auth_header = 'Digest realm="realm@host.com",qop="auth",nonce="abc",opaque="xyz"'
     app = App(auth_header=auth_header, status_code=200)
 
@@ -391,7 +394,7 @@ async def test_digest_auth_200_response_including_digest_auth_header() -> None:
 @pytest.mark.asyncio
 async def test_digest_auth_401_response_without_digest_auth_header() -> None:
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     app = App(auth_header="", status_code=401)
 
     async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
@@ -420,7 +423,7 @@ async def test_digest_auth(
     algorithm: str, expected_hash_length: int, expected_response_length: int
 ) -> None:
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     app = DigestApp(algorithm=algorithm)
 
     async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
@@ -451,7 +454,7 @@ async def test_digest_auth(
 @pytest.mark.asyncio
 async def test_digest_auth_no_specified_qop() -> None:
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     app = DigestApp(qop="")
 
     async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
@@ -483,7 +486,7 @@ async def test_digest_auth_no_specified_qop() -> None:
 @pytest.mark.asyncio
 async def test_digest_auth_qop_including_spaces_and_auth_returns_auth(qop: str) -> None:
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     app = DigestApp(qop=qop)
 
     async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
@@ -496,7 +499,7 @@ async def test_digest_auth_qop_including_spaces_and_auth_returns_auth(qop: str)
 @pytest.mark.asyncio
 async def test_digest_auth_qop_auth_int_not_implemented() -> None:
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     app = DigestApp(qop="auth-int")
 
     async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
@@ -507,18 +510,18 @@ async def test_digest_auth_qop_auth_int_not_implemented() -> None:
 @pytest.mark.asyncio
 async def test_digest_auth_qop_must_be_auth_or_auth_int() -> None:
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     app = DigestApp(qop="not-auth")
 
     async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
-        with pytest.raises(ProtocolError):
+        with pytest.raises(httpx.ProtocolError):
             await client.get(url, auth=auth)
 
 
 @pytest.mark.asyncio
 async def test_digest_auth_incorrect_credentials() -> None:
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     app = DigestApp(send_response_after_attempt=2)
 
     async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
@@ -531,7 +534,7 @@ async def test_digest_auth_incorrect_credentials() -> None:
 @pytest.mark.asyncio
 async def test_digest_auth_reuses_challenge() -> None:
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     app = DigestApp()
 
     async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
@@ -548,7 +551,7 @@ async def test_digest_auth_reuses_challenge() -> None:
 @pytest.mark.asyncio
 async def test_digest_auth_resets_nonce_count_after_401() -> None:
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     app = DigestApp()
 
     async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
@@ -596,11 +599,11 @@ async def test_async_digest_auth_raises_protocol_error_on_malformed_header(
     auth_header: str,
 ) -> None:
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     app = App(auth_header=auth_header, status_code=401)
 
     async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
-        with pytest.raises(ProtocolError):
+        with pytest.raises(httpx.ProtocolError):
             await client.get(url, auth=auth)
 
 
@@ -615,11 +618,11 @@ def test_sync_digest_auth_raises_protocol_error_on_malformed_header(
     auth_header: str,
 ) -> None:
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     app = App(auth_header=auth_header, status_code=401)
 
     with httpx.Client(transport=httpx.MockTransport(app)) as client:
-        with pytest.raises(ProtocolError):
+        with pytest.raises(httpx.ProtocolError):
             client.get(url, auth=auth)
 
 
@@ -677,7 +680,7 @@ def test_sync_auth_history() -> None:
 
 
 class ConsumeBodyTransport(httpx.MockTransport):
-    async def handle_async_request(self, request: Request) -> Response:
+    async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
         assert isinstance(request.stream, httpx.AsyncByteStream)
         [_ async for _ in request.stream]
         return self.handler(request)
@@ -686,7 +689,7 @@ class ConsumeBodyTransport(httpx.MockTransport):
 @pytest.mark.asyncio
 async def test_digest_auth_unavailable_streaming_body():
     url = "https://example.org/"
-    auth = DigestAuth(username="user", password="password123")
+    auth = httpx.DigestAuth(username="user", password="password123")
     app = DigestApp()
 
     async def streaming_body() -> typing.AsyncIterator[bytes]: