...
```
+## Event Hooks
+
+HTTPX allows you to register "event hooks" with the client, that are called
+every time a particular type of event takes place.
+
+There are currently two event hooks:
+
+* `request` - Called once a request is about to be sent. Passed the `request` instance.
+* `response` - Called once the response has been returned. Passed the `response` instance.
+
+These allow you to install client-wide functionality such as logging and monitoring.
+
+```python
+def log_request(request):
+ print(f"Request event hook: {request.method} {request.url} - Waiting for response")
+
+def log_response(response):
+ request = response.request
+ print(f"Response event hook: {request.method} {request.url} - Status {response.status_code}")
+
+client = httpx.Client(event_hooks={'request': [log_request], 'response': [log_response]})
+```
+
+You can also use these hooks to install response processing code, such as this
+example, which creates a client instance that always raises `httpx.HTTPStatusError`
+on 4xx and 5xx responses.
+
+```python
+def raise_on_4xx_5xx(response):
+ response.raise_for_status()
+
+client = httpx.Client(event_hooks={'response': [raise_on_4xx_5xx]})
+```
+
+Event hooks must always be set as a **list of callables**, and you may register
+multiple event hooks for each type of event.
+
+As well as being able to set event hooks on instantiating the client, there
+is also an `.event_hooks` property, that allows you to inspect and modify
+the installed hooks.
+
+```python
+client = httpx.Client()
+client.event_hooks['request'] = [log_request]
+client.event_hooks['response'] = [log_response, raise_for_status]
+```
+
+!!! note
+ If you are using HTTPX's async support, then you need to be aware that
+ hooks registered with `httpx.AsyncClient` MUST be async functions,
+ rather than plain functions.
+
## Monitoring download progress
If you need to monitor download progress of large responses, you can use response streaming and inspect the `response.num_bytes_downloaded` property.
cookies: CookieTypes = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
+ event_hooks: typing.Dict[str, typing.List[typing.Callable]] = None,
base_url: URLTypes = "",
trust_env: bool = True,
):
+ event_hooks = {} if event_hooks is None else event_hooks
+
self._base_url = self._enforce_trailing_slash(URL(base_url))
self._auth = self._build_auth(auth)
self._cookies = Cookies(cookies)
self._timeout = Timeout(timeout)
self.max_redirects = max_redirects
+ self._event_hooks = {
+ "request": list(event_hooks.get("request", [])),
+ "response": list(event_hooks.get("response", [])),
+ }
self._trust_env = trust_env
self._netrc = NetRCInfo()
self._is_closed = True
def timeout(self, timeout: TimeoutTypes) -> None:
self._timeout = Timeout(timeout)
+ @property
+ def event_hooks(self) -> typing.Dict[str, typing.List[typing.Callable]]:
+ return self._event_hooks
+
+ @event_hooks.setter
+ def event_hooks(
+ self, event_hooks: typing.Dict[str, typing.List[typing.Callable]]
+ ) -> None:
+ self._event_hooks = {
+ "request": list(event_hooks.get("request", [])),
+ "response": list(event_hooks.get("response", [])),
+ }
+
@property
def auth(self) -> typing.Optional[Auth]:
"""
limits: Limits = DEFAULT_LIMITS,
pool_limits: Limits = None,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
+ event_hooks: typing.Dict[str, typing.List[typing.Callable]] = None,
base_url: URLTypes = "",
transport: httpcore.SyncHTTPTransport = None,
app: typing.Callable = None,
cookies=cookies,
timeout=timeout,
max_redirects=max_redirects,
+ event_hooks=event_hooks,
base_url=base_url,
trust_env=trust_env,
)
finally:
response.close()
+ try:
+ for hook in self._event_hooks["response"]:
+ hook(response)
+ except Exception:
+ response.close()
+ raise
+
return response
def _send_handling_auth(
auth_flow = auth.sync_auth_flow(request)
request = next(auth_flow)
+ for hook in self._event_hooks["request"]:
+ hook(request)
+
while True:
response = self._send_handling_redirects(
request,
limits: Limits = DEFAULT_LIMITS,
pool_limits: Limits = None,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
+ event_hooks: typing.Dict[str, typing.List[typing.Callable]] = None,
base_url: URLTypes = "",
transport: httpcore.AsyncHTTPTransport = None,
app: typing.Callable = None,
cookies=cookies,
timeout=timeout,
max_redirects=max_redirects,
+ event_hooks=event_hooks,
base_url=base_url,
trust_env=trust_env,
)
finally:
await response.aclose()
+ try:
+ for hook in self._event_hooks["response"]:
+ await hook(response)
+ except Exception:
+ await response.aclose()
+ raise
+
return response
async def _send_handling_auth(
auth_flow = auth.async_auth_flow(request)
request = await auth_flow.__anext__()
+ for hook in self._event_hooks["request"]:
+ await hook(request)
+
while True:
response = await self._send_handling_redirects(
request,
--- /dev/null
+import pytest
+
+import httpx
+from tests.utils import AsyncMockTransport, MockTransport
+
+
+def app(request: httpx.Request) -> httpx.Response:
+ if request.url.path == "/redirect":
+ return httpx.Response(303, headers={"server": "testserver", "location": "/"})
+ elif request.url.path.startswith("/status/"):
+ status_code = int(request.url.path[-3:])
+ return httpx.Response(status_code, headers={"server": "testserver"})
+
+ return httpx.Response(200, headers={"server": "testserver"})
+
+
+def test_event_hooks():
+ events = []
+
+ def on_request(request):
+ events.append({"event": "request", "headers": dict(request.headers)})
+
+ def on_response(response):
+ events.append({"event": "response", "headers": dict(response.headers)})
+
+ event_hooks = {"request": [on_request], "response": [on_response]}
+
+ with httpx.Client(event_hooks=event_hooks, transport=MockTransport(app)) as http:
+ http.get("http://127.0.0.1:8000/", auth=("username", "password"))
+
+ assert events == [
+ {
+ "event": "request",
+ "headers": {
+ "host": "127.0.0.1:8000",
+ "user-agent": f"python-httpx/{httpx.__version__}",
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br",
+ "connection": "keep-alive",
+ "authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
+ },
+ },
+ {
+ "event": "response",
+ "headers": {"server": "testserver"},
+ },
+ ]
+
+
+def test_event_hooks_raising_exception(server):
+ def raise_on_4xx_5xx(response):
+ response.raise_for_status()
+
+ event_hooks = {"response": [raise_on_4xx_5xx]}
+
+ with httpx.Client(event_hooks=event_hooks, transport=MockTransport(app)) as http:
+ try:
+ http.get("http://127.0.0.1:8000/status/400")
+ except httpx.HTTPStatusError as exc:
+ assert exc.response.is_closed
+
+
+@pytest.mark.usefixtures("async_environment")
+async def test_async_event_hooks():
+ events = []
+
+ async def on_request(request):
+ events.append({"event": "request", "headers": dict(request.headers)})
+
+ async def on_response(response):
+ events.append({"event": "response", "headers": dict(response.headers)})
+
+ event_hooks = {"request": [on_request], "response": [on_response]}
+
+ async with httpx.AsyncClient(
+ event_hooks=event_hooks, transport=AsyncMockTransport(app)
+ ) as http:
+ await http.get("http://127.0.0.1:8000/", auth=("username", "password"))
+
+ assert events == [
+ {
+ "event": "request",
+ "headers": {
+ "host": "127.0.0.1:8000",
+ "user-agent": f"python-httpx/{httpx.__version__}",
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br",
+ "connection": "keep-alive",
+ "authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
+ },
+ },
+ {
+ "event": "response",
+ "headers": {"server": "testserver"},
+ },
+ ]
+
+
+@pytest.mark.usefixtures("async_environment")
+async def test_async_event_hooks_raising_exception():
+ async def raise_on_4xx_5xx(response):
+ response.raise_for_status()
+
+ event_hooks = {"response": [raise_on_4xx_5xx]}
+
+ async with httpx.AsyncClient(
+ event_hooks=event_hooks, transport=AsyncMockTransport(app)
+ ) as http:
+ try:
+ await http.get("http://127.0.0.1:8000/status/400")
+ except httpx.HTTPStatusError as exc:
+ assert exc.response.is_closed
+
+
+def test_event_hooks_with_redirect():
+ """
+ A redirect request should not trigger a second 'request' event hook.
+ """
+
+ events = []
+
+ def on_request(request):
+ events.append({"event": "request", "headers": dict(request.headers)})
+
+ def on_response(response):
+ events.append({"event": "response", "headers": dict(response.headers)})
+
+ event_hooks = {"request": [on_request], "response": [on_response]}
+
+ with httpx.Client(event_hooks=event_hooks, transport=MockTransport(app)) as http:
+ http.get("http://127.0.0.1:8000/redirect", auth=("username", "password"))
+
+ assert events == [
+ {
+ "event": "request",
+ "headers": {
+ "host": "127.0.0.1:8000",
+ "user-agent": f"python-httpx/{httpx.__version__}",
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br",
+ "connection": "keep-alive",
+ "authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
+ },
+ },
+ {
+ "event": "response",
+ "headers": {"server": "testserver"},
+ },
+ ]
+
+
+@pytest.mark.usefixtures("async_environment")
+async def test_async_event_hooks_with_redirect():
+ """
+ A redirect request should not trigger a second 'request' event hook.
+ """
+
+ events = []
+
+ async def on_request(request):
+ events.append({"event": "request", "headers": dict(request.headers)})
+
+ async def on_response(response):
+ events.append({"event": "response", "headers": dict(response.headers)})
+
+ event_hooks = {"request": [on_request], "response": [on_response]}
+
+ async with httpx.AsyncClient(
+ event_hooks=event_hooks, transport=AsyncMockTransport(app)
+ ) as http:
+ await http.get("http://127.0.0.1:8000/redirect", auth=("username", "password"))
+
+ assert events == [
+ {
+ "event": "request",
+ "headers": {
+ "host": "127.0.0.1:8000",
+ "user-agent": f"python-httpx/{httpx.__version__}",
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br",
+ "connection": "keep-alive",
+ "authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
+ },
+ },
+ {
+ "event": "response",
+ "headers": {"server": "testserver"},
+ },
+ ]