response = yield request
if response.status_code != 401 or "www-authenticate" not in response.headers:
- # If the response is not a 401 WWW-Authenticate, then we don't
+ # If the response is not a 401 then we don't
# need to build an authenticated request.
return
- challenge = self._parse_challenge(request, response)
+ for auth_header in response.headers.get_list("www-authenticate"):
+ if auth_header.lower().startswith("digest "):
+ break
+ else:
+ # If the response does not include a 'WWW-Authenticate: Digest ...'
+ # header, then we don't need to build an authenticated request.
+ return
+
+ challenge = self._parse_challenge(request, response, auth_header)
request.headers["Authorization"] = self._build_auth_header(request, challenge)
yield request
def _parse_challenge(
- self, request: Request, response: Response
+ self, request: Request, response: Response, auth_header: str
) -> "_DigestAuthChallenge":
"""
Returns a challenge from a Digest WWW-Authenticate header.
These take the form of:
`Digest realm="realm@host.com",qop="auth,auth-int",nonce="abc",opaque="xyz"`
"""
- header = response.headers["www-authenticate"]
+ scheme, _, fields = auth_header.partition(" ")
- scheme, _, fields = header.partition(" ")
- if scheme.lower() != "digest":
- message = "Header does not start with 'Digest'"
- raise ProtocolError(message, request=request)
+ # This method should only ever have been called with a Digest auth header.
+ assert scheme.lower() == "digest"
header_dict: typing.Dict[str, str] = {}
for field in parse_http_list(fields):
assert len(response.history) == 0
+def test_digest_auth_returns_no_auth_if_alternate_auth_scheme() -> None:
+ url = "https://example.org/"
+ auth = DigestAuth(username="tomchristie", password="password123")
+ auth_header = b"Token ..."
+
+ client = httpx.Client(
+ transport=SyncMockTransport(auth_header=auth_header, status_code=401)
+ )
+ response = client.get(url, auth=auth)
+
+ assert response.status_code == 401
+ assert response.json() == {"auth": None}
+ assert len(response.history) == 0
+
+
@pytest.mark.asyncio
async def test_digest_auth_200_response_including_digest_auth_header() -> None:
url = "https://example.org/"
"auth_header",
[
b'Digest realm="httpx@example.org", qop="auth"', # missing fields
- b'realm="httpx@example.org", qop="auth"', # not starting with Digest
- b'DigestZ realm="httpx@example.org", qop="auth"'
- b'qop="auth,auth-int",nonce="abc",opaque="xyz"',
b'Digest realm="httpx@example.org", qop="auth,au', # malformed fields list
],
)
"auth_header",
[
b'Digest realm="httpx@example.org", qop="auth"', # missing fields
- b'realm="httpx@example.org", qop="auth"', # not starting with Digest
- b'DigestZ realm="httpx@example.org", qop="auth"'
- b'qop="auth,auth-int",nonce="abc",opaque="xyz"',
b'Digest realm="httpx@example.org", qop="auth,au', # malformed fields list
],
)