ProxyError,
ReadError,
ReadTimeout,
- RedirectError,
RequestBodyUnavailable,
+ RequestError,
RequestNotRead,
ResponseClosed,
ResponseNotRead,
StreamError,
TimeoutException,
TooManyRedirects,
+ TransportError,
WriteError,
WriteTimeout,
)
"ProtocolError",
"ReadError",
"ReadTimeout",
- "RedirectError",
+ "RequestError",
"RequestBodyUnavailable",
"ResponseClosed",
"ResponseNotRead",
"ProxyError",
"TimeoutException",
"TooManyRedirects",
+ "TransportError",
"WriteError",
"WriteTimeout",
"URL",
# need to build an authenticated request.
return
- header = response.headers["www-authenticate"]
- challenge = self._parse_challenge(header)
+ challenge = self._parse_challenge(request, response)
request.headers["Authorization"] = self._build_auth_header(request, challenge)
yield request
- def _parse_challenge(self, header: str) -> "_DigestAuthChallenge":
+ def _parse_challenge(
+ self, request: Request, response: Response
+ ) -> "_DigestAuthChallenge":
"""
Returns a challenge from a Digest WWW-Authenticate header.
These take the form of:
`Digest realm="realm@host.com",qop="auth,auth-int",nonce="abc",opaque="xyz"`
"""
+ header = response.headers["www-authenticate"]
+
scheme, _, fields = header.partition(" ")
if scheme.lower() != "digest":
- raise ProtocolError("Header does not start with 'Digest'")
+ message = "Header does not start with 'Digest'"
+ raise ProtocolError(message, request=request)
header_dict: typing.Dict[str, str] = {}
for field in parse_http_list(fields):
realm=realm, nonce=nonce, qop=qop, opaque=opaque, algorithm=algorithm
)
except KeyError as exc:
- raise ProtocolError("Malformed Digest WWW-Authenticate header") from exc
+ message = "Malformed Digest WWW-Authenticate header"
+ raise ProtocolError(message, request=request) from exc
def _build_auth_header(
self, request: Request, challenge: "_DigestAuthChallenge"
if challenge.algorithm.lower().endswith("-sess"):
HA1 = digest(b":".join((HA1, challenge.nonce, cnonce)))
- qop = self._resolve_qop(challenge.qop)
+ qop = self._resolve_qop(challenge.qop, request=request)
if qop is None:
digest_data = [HA1, challenge.nonce, HA2]
else:
return header_value
- def _resolve_qop(self, qop: typing.Optional[bytes]) -> typing.Optional[bytes]:
+ def _resolve_qop(
+ self, qop: typing.Optional[bytes], request: Request
+ ) -> typing.Optional[bytes]:
if qop is None:
return None
qops = re.split(b", ?", qop)
if qops == [b"auth-int"]:
raise NotImplementedError("Digest auth-int support is not yet implemented")
- raise ProtocolError(f'Unexpected qop value "{qop!r}" in digest auth')
+ message = f'Unexpected qop value "{qop!r}" in digest auth'
+ raise ProtocolError(message, request=request)
class _DigestAuthChallenge:
# Check that we can handle the scheme
if url.scheme and url.scheme not in ("http", "https"):
- raise InvalidURL(f'Scheme "{url.scheme}" not supported.')
+ message = f'Scheme "{url.scheme}" not supported.'
+ raise InvalidURL(message, request=request)
# Handle malformed 'Location' headers that are "absolute" form, have no host.
# See: https://github.com/encode/httpx/issues/771
http2=http2,
)
- def _transport_for_url(self, url: URL) -> httpcore.SyncHTTPTransport:
+ def _transport_for_url(self, request: Request) -> httpcore.SyncHTTPTransport:
"""
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
- enforce_http_url(url)
+ url = request.url
+ enforce_http_url(request)
if self._proxies and not should_not_be_proxied(url):
for matcher, transport in self._proxies.items():
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
if request.url.scheme not in ("http", "https"):
- raise InvalidURL('URL scheme must be "http" or "https".')
+ message = 'URL scheme must be "http" or "https".'
+ raise InvalidURL(message, request=request)
timeout = self.timeout if isinstance(timeout, UnsetType) else Timeout(timeout)
"""
Sends a single request, without handling any redirections.
"""
- transport = self._transport_for_url(request.url)
+ transport = self._transport_for_url(request)
with map_exceptions(HTTPCORE_EXC_MAP, request=request):
(
http2=http2,
)
- def _transport_for_url(self, url: URL) -> httpcore.AsyncHTTPTransport:
+ def _transport_for_url(self, request: Request) -> httpcore.AsyncHTTPTransport:
"""
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
- enforce_http_url(url)
+ url = request.url
+ enforce_http_url(request)
if self._proxies and not should_not_be_proxied(url):
for matcher, transport in self._proxies.items():
"""
Sends a single request, without handling any redirections.
"""
- transport = self._transport_for_url(request.url)
+ transport = self._transport_for_url(request)
with map_exceptions(HTTPCORE_EXC_MAP, request=request):
(
except ImportError: # pragma: nocover
brotli = None
+if typing.TYPE_CHECKING: # pragma: no cover
+ from ._models import Request
+
class Decoder:
+ def __init__(self, request: "Request") -> None:
+ self.request = request
+
def decode(self, data: bytes) -> bytes:
raise NotImplementedError() # pragma: nocover
See: https://stackoverflow.com/questions/1838699
"""
- def __init__(self) -> None:
+ def __init__(self, request: "Request") -> None:
+ self.request = request
self.first_attempt = True
self.decompressor = zlib.decompressobj()
if was_first_attempt:
self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
return self.decode(data)
- raise DecodingError from exc
+ raise DecodingError(message=str(exc), request=self.request)
def flush(self) -> bytes:
try:
return self.decompressor.flush()
except zlib.error as exc: # pragma: nocover
- raise DecodingError from exc
+ raise DecodingError(message=str(exc), request=self.request)
class GZipDecoder(Decoder):
See: https://stackoverflow.com/questions/1838699
"""
- def __init__(self) -> None:
+ def __init__(self, request: "Request") -> None:
+ self.request = request
self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16)
def decode(self, data: bytes) -> bytes:
try:
return self.decompressor.decompress(data)
except zlib.error as exc:
- raise DecodingError from exc
+ raise DecodingError(message=str(exc), request=self.request)
def flush(self) -> bytes:
try:
return self.decompressor.flush()
except zlib.error as exc: # pragma: nocover
- raise DecodingError from exc
+ raise DecodingError(message=str(exc), request=self.request)
class BrotliDecoder(Decoder):
name. The top branches are for 'brotlipy' and bottom branches for 'Brotli'
"""
- def __init__(self) -> None:
+ def __init__(self, request: "Request") -> None:
assert (
brotli is not None
), "The 'brotlipy' or 'brotli' library must be installed to use 'BrotliDecoder'"
+ self.request = request
self.decompressor = brotli.Decompressor()
self.seen_data = False
if hasattr(self.decompressor, "decompress"):
try:
return self._decompress(data)
except brotli.error as exc:
- raise DecodingError from exc
+ raise DecodingError(message=str(exc), request=self.request)
def flush(self) -> bytes:
if not self.seen_data:
self.decompressor.finish()
return b""
except brotli.error as exc: # pragma: nocover
- raise DecodingError from exc
+ raise DecodingError(message=str(exc), request=self.request)
class MultiDecoder(Decoder):
Handles incrementally decoding bytes into text
"""
- def __init__(self, encoding: typing.Optional[str] = None):
+ def __init__(self, request: "Request", encoding: typing.Optional[str] = None):
+ self.request = request
self.decoder: typing.Optional[codecs.IncrementalDecoder] = (
None if encoding is None else codecs.getincrementaldecoder(encoding)()
)
self.buffer = None
return text
- except UnicodeDecodeError: # pragma: nocover
- raise DecodingError() from None
+ except UnicodeDecodeError as exc: # pragma: nocover
+ raise DecodingError(message=str(exc), request=self.request)
def flush(self) -> str:
try:
return bytes(self.buffer).decode(self._detector_result())
return self.decoder.decode(b"", True)
- except UnicodeDecodeError: # pragma: nocover
- raise DecodingError() from None
+ except UnicodeDecodeError as exc: # pragma: nocover
+ raise DecodingError(message=str(exc), request=self.request)
def _detector_result(self) -> str:
self.detector.close()
result = self.detector.result["encoding"]
if not result: # pragma: nocover
- raise DecodingError("Unable to determine encoding of content")
+ message = "Unable to determine encoding of content"
+ raise DecodingError(message, request=self.request)
return result
+"""
+Our exception hierarchy:
+
+* RequestError
+ + TransportError
+ - TimeoutException
+ · ConnectTimeout
+ · ReadTimeout
+ · WriteTimeout
+ · PoolTimeout
+ - NetworkError
+ · ConnectError
+ · ReadError
+ · WriteError
+ · CloseError
+ - ProxyError
+ - ProtocolError
+ + DecodingError
+ + TooManyRedirects
+ + RequestBodyUnavailable
+ + InvalidURL
+* HTTPStatusError
+* NotRedirectResponse
+* CookieConflict
+* StreamError
+ + StreamConsumed
+ + ResponseNotRead
+ + RequestNotRead
+ + ResponseClosed
+"""
import contextlib
import typing
from ._models import Request, Response # pragma: nocover
-class HTTPError(Exception):
+class RequestError(Exception):
"""
- Base class for all HTTPX exceptions.
+ Base class for all exceptions that may occur when issuing a `.request()`.
"""
- def __init__(
- self, *args: typing.Any, request: "Request" = None, response: "Response" = None
- ) -> None:
- super().__init__(*args)
- self._request = request or (response.request if response is not None else None)
- self.response = response
+ def __init__(self, message: str, *, request: "Request") -> None:
+ super().__init__(message)
+ self.request = request
- @property
- def request(self) -> "Request":
- # NOTE: this property exists so that a `Request` is exposed to type
- # checkers, instead of `Optional[Request]`.
- assert self._request is not None # Populated by the client.
- return self._request
+
+class TransportError(RequestError):
+ """
+ Base class for all exceptions that are mapped from the httpcore API.
+ """
# Timeout exceptions...
-class TimeoutException(HTTPError):
+class TimeoutException(TransportError):
"""
The base class for timeout errors.
# Core networking exceptions...
-class NetworkError(HTTPError):
+class NetworkError(TransportError):
"""
The base class for network-related errors.
# Other transport exceptions...
-class ProxyError(HTTPError):
+class ProxyError(TransportError):
"""
An error occurred while proxying a request.
"""
-class ProtocolError(HTTPError):
+class ProtocolError(TransportError):
"""
A protocol was violated by the server.
"""
-# HTTP exceptions...
+# Other request exceptions...
-class DecodingError(HTTPError):
+class DecodingError(RequestError):
"""
Decoding of the response failed.
"""
-class HTTPStatusError(HTTPError):
+class TooManyRedirects(RequestError):
"""
- Response sent an error HTTP status.
+ Too many redirects.
"""
- def __init__(self, *args: typing.Any, response: "Response") -> None:
- super().__init__(*args)
- self._request = response.request
- self.response = response
-
-# Redirect exceptions...
+class RequestBodyUnavailable(RequestError):
+ """
+ Had to send the request again, but the request body was streaming, and is
+ no longer available.
+ """
-class RedirectError(HTTPError):
+class InvalidURL(RequestError):
"""
- Base class for HTTP redirect errors.
+ URL was missing a hostname, or was not one of HTTP/HTTPS.
"""
-class TooManyRedirects(RedirectError):
+# Client errors
+
+
+class HTTPStatusError(Exception):
"""
- Too many redirects.
+ Response sent an error HTTP status.
+
+ May be raised when calling `response.raise_for_status()`
"""
+ def __init__(
+ self, message: str, *, request: "Request", response: "Response"
+ ) -> None:
+ super().__init__(message)
+ self.request = request
+ self.response = response
-class NotRedirectResponse(RedirectError):
+
+class NotRedirectResponse(Exception):
"""
Response was not a redirect response.
+
+ May be raised if `response.next()` is called without first
+ properly checking `response.is_redirect`.
"""
+ def __init__(self, message: str) -> None:
+ super().__init__(message)
+
+
+class CookieConflict(Exception):
+ """
+ Attempted to lookup a cookie by name, but multiple cookies existed.
+
+ Can occur when calling `response.cookies.get(...)`.
+ """
+
+ def __init__(self, message: str) -> None:
+ super().__init__(message)
+
# Stream exceptions...
+# These may occur as the result of a programming error, by accessing
+# the request/response stream in an invalid manner.
-class StreamError(HTTPError):
+
+class StreamError(Exception):
"""
The base class for stream exceptions.
an invalid way.
"""
-
-class RequestBodyUnavailable(StreamError):
- """
- Had to send the request again, but the request body was streaming, and is
- no longer available.
- """
+ def __init__(self, message: str) -> None:
+ super().__init__(message)
class StreamConsumed(StreamError):
been streamed.
"""
+ def __init__(self) -> None:
+ message = (
+ "Attempted to read or stream response content, but the content has "
+ "already been streamed."
+ )
+ super().__init__(message)
+
class ResponseNotRead(StreamError):
"""
after a streaming response.
"""
+ def __init__(self) -> None:
+ message = (
+ "Attempted to access response content, without having called `read()` "
+ "after a streaming response."
+ )
+ super().__init__(message)
+
class RequestNotRead(StreamError):
"""
Attempted to access request content, without having called `read()`.
"""
+ def __init__(self) -> None:
+ message = "Attempted to access request content, without having called `read()`."
+ super().__init__(message)
+
class ResponseClosed(StreamError):
"""
closed.
"""
-
-# Other cases...
-
-
-class InvalidURL(HTTPError):
- """
- URL was missing a hostname, or was not one of HTTP/HTTPS.
- """
+ def __init__(self) -> None:
+ message = (
+ "Attempted to read or stream response content, but the request has "
+ "been closed."
+ )
+ super().__init__(message)
-class CookieConflict(HTTPError):
- """
- Attempted to lookup a cookie by name, but multiple cookies existed.
- """
+# We're continuing to expose this earlier naming at the moment.
+# It is due to be deprecated. Don't use it.
+HTTPError = RequestError
@contextlib.contextmanager
value = value.strip().lower()
try:
decoder_cls = SUPPORTED_DECODERS[value]
- decoders.append(decoder_cls())
+ decoders.append(decoder_cls(request=self.request))
except KeyError:
continue
if len(decoders) == 1:
self._decoder = decoders[0]
elif len(decoders) > 1:
- self._decoder = MultiDecoder(decoders)
+ self._decoder = MultiDecoder(children=decoders)
else:
- self._decoder = IdentityDecoder()
+ self._decoder = IdentityDecoder(request=self.request)
return self._decoder
if codes.is_client_error(self.status_code):
message = message.format(self, error_type="Client Error")
- raise HTTPStatusError(message, response=self)
+ raise HTTPStatusError(message, request=self.request, response=self)
elif codes.is_server_error(self.status_code):
message = message.format(self, error_type="Server Error")
- raise HTTPStatusError(message, response=self)
+ raise HTTPStatusError(message, request=self.request, response=self)
def json(self, **kwargs: typing.Any) -> typing.Any:
if self.charset_encoding is None and self.content and len(self.content) > 3:
that handles both gzip, deflate, etc but also detects the content's
string encoding.
"""
- decoder = TextDecoder(encoding=self.charset_encoding)
+ decoder = TextDecoder(request=self.request, encoding=self.charset_encoding)
for chunk in self.iter_bytes():
yield decoder.decode(chunk)
yield decoder.flush()
Get the next response from a redirect response.
"""
if not self.is_redirect:
- raise NotRedirectResponse()
+ message = (
+ "Called .next(), but the response was not a redirect. "
+ "Calling code should check `response.is_redirect` first."
+ )
+ raise NotRedirectResponse(message)
assert self.call_next is not None
return self.call_next()
that handles both gzip, deflate, etc but also detects the content's
string encoding.
"""
- decoder = TextDecoder(encoding=self.charset_encoding)
+ decoder = TextDecoder(request=self.request, encoding=self.charset_encoding)
async for chunk in self.aiter_bytes():
yield decoder.decode(chunk)
yield decoder.flush()
Get the next response from a redirect response.
"""
if not self.is_redirect:
- raise NotRedirectResponse()
+ raise NotRedirectResponse(
+ "Called .anext(), but the response was not a redirect. "
+ "Calling code should check `response.is_redirect` first."
+ )
assert self.call_next is not None
return await self.call_next()
from ._types import PrimitiveData
if typing.TYPE_CHECKING: # pragma: no cover
- from ._models import URL
+ from ._models import URL, Request
_HTML5_FORM_ENCODING_REPLACEMENTS = {'"': "%22", "\\": "\\\\"}
return typing.cast(Logger, logger)
-def enforce_http_url(url: "URL") -> None:
+def enforce_http_url(request: "Request") -> None:
"""
Raise an appropriate InvalidURL for any non-HTTP URLs.
"""
+ url = request.url
+
if not url.scheme:
- raise InvalidURL("No scheme included in URL.")
+ message = "No scheme included in URL."
+ raise InvalidURL(message, request=request)
if not url.host:
- raise InvalidURL("No host included in URL.")
+ message = "No host included in URL."
+ raise InvalidURL(message, request=request)
if url.scheme not in ("http", "https"):
- raise InvalidURL('URL scheme must be "http" or "https".')
+ message = 'URL scheme must be "http" or "https".'
+ raise InvalidURL(message, request=request)
def port_or_default(url: "URL") -> typing.Optional[int]:
)
def test_transport_for_request(url, proxies, expected):
client = httpx.AsyncClient(proxies=proxies)
- transport = client._transport_for_url(httpx.URL(url))
+ request = httpx.Request(method="GET", url=url)
+ transport = client._transport_for_url(request)
if expected is None:
assert transport is client._transport
monkeypatch.setenv(name, value)
client = client_class()
- transport = client._transport_for_url(httpx.URL(url))
+ request = httpx.Request(method="GET", url=url)
+ transport = client._transport_for_url(request)
if expected is None:
assert transport == client._transport
@pytest.mark.usefixtures("async_environment")
async def test_asgi_http_error():
- client = httpx.AsyncClient(app=partial(raise_exc, exc=httpx.HTTPError))
- with pytest.raises(httpx.HTTPError):
+ client = httpx.AsyncClient(app=partial(raise_exc, exc=RuntimeError))
+ with pytest.raises(RuntimeError):
await client.get("http://www.example.org/")
"decoder", (BrotliDecoder, DeflateDecoder, GZipDecoder, IdentityDecoder)
)
def test_decoders_empty_cases(decoder):
- instance = decoder()
+ request = httpx.Request(method="GET", url="https://www.example.com")
+ instance = decoder(request)
assert instance.decode(b"") == b""
assert instance.flush() == b""
def test_text_decoder_empty_cases():
- decoder = TextDecoder()
+ request = httpx.Request(method="GET", url="https://www.example.com")
+
+ decoder = TextDecoder(request=request)
assert decoder.flush() == ""
- decoder = TextDecoder()
+ decoder = TextDecoder(request=request)
assert decoder.decode(b"") == ""
assert decoder.flush() == ""
def test_wsgi_http_error():
- client = httpx.Client(app=partial(raise_exc, exc=httpx.HTTPError))
- with pytest.raises(httpx.HTTPError):
+ client = httpx.Client(app=partial(raise_exc, exc=RuntimeError))
+ with pytest.raises(RuntimeError):
client.get("http://www.example.org/")