Here is a list of environment variables that HTTPX recognizes
and what function they serve:
-`HTTPX_DEBUG`
------------
+`HTTPX_LOG_LEVEL`
+-----------------
-Valid values: `1`, `true`
+Valid values: `debug`, `trace` (case-insensitive)
-If this environment variable is set to a valid value then low-level
-details about the execution of HTTP requests will be logged to `stderr`.
+If set to `trace`, then low-level details about the execution of HTTP requests will be logged to `stderr`. This can help you debug issues and see what's exactly being sent over the wire and to which location.
-This can help you debug issues and see what's exactly being sent
-over the wire and to which location.
+The `debug` log level is currently ignored, but is planned to issue high-level logs of HTTP requests.
Example:
```
```console
-user@host:~$ HTTPX_DEBUG=1 python test_script.py
+user@host:~$ HTTPX_LOG_LEVEL=trace python test_script.py
20:54:17.585 - httpx.dispatch.connection_pool - acquire_connection origin=Origin(scheme='https' host='www.google.com' port=443)
20:54:17.585 - httpx.dispatch.connection_pool - new_connection connection=HTTPConnection(origin=Origin(scheme='https' host='www.google.com' port=443))
20:54:17.590 - httpx.dispatch.connection - start_connect host='www.google.com' port=443 timeout=TimeoutConfig(timeout=5.0)
) -> ssl.SSLContext:
http_versions = HTTPVersionConfig() if http_versions is None else http_versions
- logger.debug(
+ logger.trace(
f"load_ssl_context "
f"verify={self.verify!r} "
f"cert={self.cert!r} "
pass
if ca_bundle_path.is_file():
- logger.debug(f"load_verify_locations cafile={ca_bundle_path!s}")
+ logger.trace(f"load_verify_locations cafile={ca_bundle_path!s}")
context.load_verify_locations(cafile=str(ca_bundle_path))
elif ca_bundle_path.is_dir():
- logger.debug(f"load_verify_locations capath={ca_bundle_path!s}")
+ logger.trace(f"load_verify_locations capath={ca_bundle_path!s}")
context.load_verify_locations(capath=str(ca_bundle_path))
self._load_client_certs(context)
else:
on_release = functools.partial(self.release_func, self)
- logger.debug(f"start_connect host={host!r} port={port!r} timeout={timeout!r}")
+ logger.trace(f"start_connect host={host!r} port={port!r} timeout={timeout!r}")
stream = await self.backend.open_tcp_stream(host, port, ssl_context, timeout)
http_version = stream.get_http_version()
- logger.debug(f"connected http_version={http_version!r}")
+ logger.trace(f"connected http_version={http_version!r}")
if http_version == "HTTP/2":
self.h2_connection = HTTP2Connection(
)
async def close(self) -> None:
- logger.debug("close_connection")
+ logger.trace("close_connection")
if self.h2_connection is not None:
await self.h2_connection.close()
elif self.h11_connection is not None:
return response
async def acquire_connection(self, origin: Origin) -> HTTPConnection:
- logger.debug(f"acquire_connection origin={origin!r}")
+ logger.trace(f"acquire_connection origin={origin!r}")
connection = self.pop_connection(origin)
if connection is None:
release_func=self.release_connection,
trust_env=self.trust_env,
)
- logger.debug(f"new_connection connection={connection!r}")
+ logger.trace(f"new_connection connection={connection!r}")
else:
- logger.debug(f"reuse_connection connection={connection!r}")
+ logger.trace(f"reuse_connection connection={connection!r}")
self.active_connections.add(connection)
return connection
async def release_connection(self, connection: HTTPConnection) -> None:
- logger.debug(f"release_connection connection={connection!r}")
+ logger.trace(f"release_connection connection={connection!r}")
if connection.is_closed:
self.active_connections.remove(connection)
self.max_connections.release()
async def close(self) -> None:
event = h11.ConnectionClosed()
try:
- logger.debug(f"send_event event={event!r}")
+ logger.trace(f"send_event event={event!r}")
self.h11_state.send(event)
except h11.LocalProtocolError: # pragma: no cover
# Premature client disconnect
"""
Send the request method, URL, and headers to the network.
"""
- logger.debug(
+ logger.trace(
f"send_headers method={request.method!r} "
f"target={request.url.full_path!r} "
f"headers={request.headers!r}"
try:
# Send the request body.
async for chunk in data:
- logger.debug(f"send_data data=Data(<{len(chunk)} bytes>)")
+ logger.trace(f"send_data data=Data(<{len(chunk)} bytes>)")
event = h11.Data(data=chunk)
await self._send_event(event, timeout)
event = self.h11_state.next_event()
if isinstance(event, h11.Data):
- logger.debug(f"receive_event event=Data(<{len(event.data)} bytes>)")
+ logger.trace(f"receive_event event=Data(<{len(event.data)} bytes>)")
else:
- logger.debug(f"receive_event event={event!r}")
+ logger.trace(f"receive_event event={event!r}")
if event is h11.NEED_DATA:
try:
return event
async def response_closed(self) -> None:
- logger.debug(
+ logger.trace(
f"response_closed "
f"our_state={self.h11_state.our_state!r} "
f"their_state={self.h11_state.their_state}"
(b":path", request.url.full_path.encode("ascii")),
] + [(k, v) for k, v in request.headers.raw if k != b"host"]
- logger.debug(
+ logger.trace(
f"send_headers "
f"stream_id={stream_id} "
f"method={request.method!r} "
await self.stream.write(data_to_send, timeout)
async def end_stream(self, stream_id: int, timeout: TimeoutConfig = None) -> None:
- logger.debug(f"end_stream stream_id={stream_id}")
+ logger.trace(f"end_stream stream_id={stream_id}")
self.h2_state.end_stream(stream_id)
data_to_send = self.h2_state.data_to_send()
await self.stream.write(data_to_send, timeout)
events = self.h2_state.receive_data(data)
for event in events:
event_stream_id = getattr(event, "stream_id", 0)
- logger.debug(
+ logger.trace(
f"receive_event stream_id={event_stream_id} event={event!r}"
)
async def acquire_connection(self, origin: Origin) -> HTTPConnection:
if self.should_forward_origin(origin):
- logger.debug(
+ logger.trace(
f"forward_connection proxy_url={self.proxy_url!r} origin={origin!r}"
)
return await super().acquire_connection(self.proxy_url.origin)
else:
- logger.debug(
+ logger.trace(
f"tunnel_connection proxy_url={self.proxy_url!r} origin={origin!r}"
)
return await self.tunnel_connection(origin)
# See if our tunnel has been opened successfully
proxy_response = await connection.send(proxy_request)
- logger.debug(
+ logger.trace(
f"tunnel_response "
f"proxy_url={self.proxy_url!r} "
f"origin={origin!r} "
ssl_context = await connection.get_ssl_context(ssl_config)
assert ssl_context is not None
- logger.debug(
+ logger.trace(
f"tunnel_start_tls "
f"proxy_url={self.proxy_url!r} "
f"origin={origin!r}"
hostname=origin.host, ssl_context=ssl_context, timeout=timeout
)
http_version = stream.get_http_version()
- logger.debug(
+ logger.trace(
f"tunnel_tls_complete "
f"proxy_url={self.proxy_url!r} "
f"origin={origin!r} "
_LOGGER_INITIALIZED = False
+TRACE_LOG_LEVEL = 5
-def get_logger(name: str) -> logging.Logger:
- """Gets a `logging.Logger` instance and optionally
- sets up debug logging if the user requests it via
- the `HTTPX_DEBUG=1` environment variable.
+class Logger(logging.Logger):
+ # Stub for type checkers.
+ def trace(self, message: str, *args: typing.Any, **kwargs: typing.Any) -> None:
+ ...
+
+
+def get_logger(name: str) -> Logger:
+ """
+ Get a `logging.Logger` instance, and optionally
+ set up debug logging based on the HTTPX_LOG_LEVEL environment variable.
"""
global _LOGGER_INITIALIZED
if not _LOGGER_INITIALIZED:
_LOGGER_INITIALIZED = True
- if os.environ.get("HTTPX_DEBUG", "").lower() in ("1", "true"):
+
+ log_level = os.environ.get("HTTPX_LOG_LEVEL", "").upper()
+ if log_level in ("DEBUG", "TRACE"):
logger = logging.getLogger("httpx")
- logger.setLevel(logging.DEBUG)
+ logger.setLevel(logging.DEBUG if log_level == "DEBUG" else TRACE_LOG_LEVEL)
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(
logging.Formatter(
)
logger.addHandler(handler)
- return logging.getLogger(name)
+ logger = logging.getLogger(name)
+
+ def trace(message: str, *args: typing.Any, **kwargs: typing.Any) -> None:
+ logger.log(TRACE_LOG_LEVEL, message, *args, **kwargs)
+
+ logger.trace = trace # type: ignore
+
+ return typing.cast(Logger, logger)
def kv_format(**kwargs: typing.Any) -> str:
class MessageLoggerASGIMiddleware:
- def __init__(self, app: typing.Callable, logger: logging.Logger) -> None:
+ def __init__(self, app: typing.Callable, logger: Logger) -> None:
self.app = app
self.logger = logger
async def inner_receive() -> dict:
message = await receive()
logged_message = asgi_message_with_placeholders(message)
- self.logger.debug(f"sent {kv_format(**logged_message)}")
+ self.logger.trace(f"sent {kv_format(**logged_message)}")
return message
async def inner_send(message: dict) -> None:
logged_message = asgi_message_with_placeholders(message)
- self.logger.debug(f"received {kv_format(**logged_message)}")
+ self.logger.trace(f"received {kv_format(**logged_message)}")
await send(message)
logged_scope = dict(scope)
logged_scope["headers"] = list(
obfuscate_sensitive_headers(scope["headers"])
)
- self.logger.debug(f"started {kv_format(**logged_scope)}")
+ self.logger.trace(f"started {kv_format(**logged_scope)}")
try:
await self.app(scope, inner_receive, inner_send)
except BaseException as exc:
- self.logger.debug("raised_exception")
+ self.logger.trace("raised_exception")
raise exc from None
else:
- self.logger.debug("completed")
+ self.logger.trace("completed")
@pytest.mark.asyncio
-@pytest.mark.parametrize("httpx_debug", ["0", "1", "True", "False"])
-async def test_httpx_debug_enabled_stderr_logging(server, capsys, httpx_debug):
- os.environ["HTTPX_DEBUG"] = httpx_debug
+@pytest.mark.parametrize("httpx_log_level", ["trace", "debug"])
+async def test_httpx_log_level_enabled_stderr_logging(server, capsys, httpx_log_level):
+ os.environ["HTTPX_LOG_LEVEL"] = httpx_log_level
# Force a reload on the logging handlers
utils._LOGGER_INITIALIZED = False
async with httpx.AsyncClient() as client:
await client.get(server.url)
- if httpx_debug in ("1", "True"):
+ if httpx_log_level == "trace":
assert "httpx.dispatch.connection_pool" in capsys.readouterr().err
else:
assert "httpx.dispatch.connection_pool" not in capsys.readouterr().err