TextDecoder,
)
from ._exceptions import (
+ HTTPCORE_EXC_MAP,
CookieConflict,
HTTPStatusError,
InvalidURL,
ResponseClosed,
ResponseNotRead,
StreamConsumed,
+ map_exceptions,
)
from ._status_codes import codes
from ._types import (
raise ResponseClosed()
self.is_stream_consumed = True
- for part in self._raw_stream:
- yield part
+ with map_exceptions(HTTPCORE_EXC_MAP, request=self.request):
+ for part in self._raw_stream:
+ yield part
self.close()
def next(self) -> "Response":
raise ResponseClosed()
self.is_stream_consumed = True
- async for part in self._raw_stream:
- yield part
+ with map_exceptions(HTTPCORE_EXC_MAP, request=self.request):
+ async for part in self._raw_stream:
+ yield part
await self.aclose()
async def anext(self) -> "Response":
assert scope["type"] == "http"
if scope["path"].startswith("/slow_response"):
await slow_response(scope, receive, send)
+ elif scope["path"].startswith("/slow_stream_response"):
+ await slow_stream_response(scope, receive, send)
elif scope["path"].startswith("/status"):
await status_code(scope, receive, send)
elif scope["path"].startswith("/echo_body"):
await send({"type": "http.response.body", "body": b"Hello, world!"})
+async def slow_stream_response(scope, receive, send):
+ await send(
+ {
+ "type": "http.response.start",
+ "status": 200,
+ "headers": [[b"content-type", b"text/plain"]],
+ }
+ )
+
+ await sleep(1)
+ await send({"type": "http.response.body", "body": b"", "more_body": False})
+
+
async def status_code(scope, receive, send):
status_code = int(scope["path"].replace("/status/", ""))
await send(
pytest.fail(f"Unmapped httpcore exceptions: {not_mapped}")
-def test_httpcore_exception_mapping() -> None:
+def test_httpcore_exception_mapping(server) -> None:
"""
HTTPCore exception mapping works as expected.
"""
with pytest.raises(httpx.ConnectError):
httpx.get("http://doesnotexist")
+ # Make sure streaming methods also map exceptions.
+ url = server.url.copy_with(path="/slow_stream_response")
+ timeout = httpx.Timeout(None, read=0.1)
+ with httpx.stream("GET", url, timeout=timeout) as stream:
+ with pytest.raises(httpx.ReadTimeout):
+ stream.read()
+
# Make sure it also works with custom transports.
class MockTransport(httpcore.SyncHTTPTransport):
def request(self, *args: Any, **kwargs: Any) -> Any: