env: NOX_SESSION=test-3.7
- python: 3.8-dev
env: NOX_SESSION=test-3.8
+ dist: bionic # Required to get OpenSSL 1.1.1+
install:
- pip install --upgrade nox
20:54:17.742 - httpx.dispatch.http2 - receive_event stream_id=0 event=<PingReceived ping_data:0000000000000000>
20:54:17.743 - httpx.dispatch.connection_pool - release_connection connection=HTTPConnection(origin=Origin(scheme='https' host='www.google.com' port=443))
```
+
+`SSLKEYLOGFILE`
+-----------
+
+Valid values: a filename
+
+If this environment variable is set, TLS keys will be appended to the specified file, creating it if it doesn't exist, whenever key material is generated or received. The keylog file is designed for debugging purposes only.
+
+Support for `SSLKEYLOGFILE` requires Python 3.8 and OpenSSL 1.1.1 or newer.
+
+Example:
+
+```python
+# test_script.py
+
+import httpx
+client = httpx.Client()
+client.get("https://google.com")
+```
+
+```console
+SSLKEYLOGFILE=test.log python test_script.py
+cat test.log
+# TLS secrets log file, generated by OpenSSL / Python
+SERVER_HANDSHAKE_TRAFFIC_SECRET XXXX
+EXPORTER_SECRET XXXX
+SERVER_TRAFFIC_SECRET_0 XXXX
+CLIENT_HANDSHAKE_TRAFFIC_SECRET XXXX
+CLIENT_TRAFFIC_SECRET_0 XXXX
+SERVER_HANDSHAKE_TRAFFIC_SECRET XXXX
+EXPORTER_SECRET XXXX
+SERVER_TRAFFIC_SECRET_0 XXXX
+CLIENT_HANDSHAKE_TRAFFIC_SECRET XXXX
+CLIENT_TRAFFIC_SECRET_0 XXXX
+```
else:
dispatch = ASGIDispatch(app=app)
+ self.trust_env = True if trust_env is None else trust_env
+
if dispatch is None:
async_dispatch: AsyncDispatcher = ConnectionPool(
verify=verify,
http_versions=http_versions,
pool_limits=pool_limits,
backend=backend,
+ trust_env=self.trust_env,
)
elif isinstance(dispatch, Dispatcher):
async_dispatch = ThreadedDispatcher(dispatch, backend)
self.max_redirects = max_redirects
self.dispatch = async_dispatch
self.concurrency_backend = backend
- self.trust_env = True if trust_env is None else trust_env
@property
def headers(self) -> Headers:
+import os
import ssl
import typing
from pathlib import Path
SSL Configuration.
"""
- def __init__(self, *, cert: CertTypes = None, verify: VerifyTypes = True):
+ def __init__(
+ self,
+ *,
+ cert: CertTypes = None,
+ verify: VerifyTypes = True,
+ trust_env: bool = None,
+ ):
self.cert = cert
# Allow passing in our own SSLContext object that's pre-configured.
self.ssl_context: typing.Optional[ssl.SSLContext] = ssl_context
self.verify: typing.Union[str, bool] = verify
+ self.trust_env = trust_env
def __eq__(self, other: typing.Any) -> bool:
return (
if ssl.HAS_NPN: # pragma: no cover
context.set_npn_protocols(http_versions.alpn_identifiers)
+ if hasattr(context, "keylog_filename"):
+ keylogfile = os.environ.get("SSLKEYLOGFILE")
+ if keylogfile and self.trust_env:
+ context.keylog_filename = keylogfile # type: ignore
+
return context
def _load_client_certs(self, ssl_context: ssl.SSLContext) -> None:
origin: typing.Union[str, Origin],
verify: VerifyTypes = True,
cert: CertTypes = None,
+ trust_env: bool = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
http_versions: HTTPVersionTypes = None,
backend: ConcurrencyBackend = None,
release_func: typing.Optional[ReleaseCallback] = None,
):
self.origin = Origin(origin) if isinstance(origin, str) else origin
- self.ssl = SSLConfig(cert=cert, verify=verify)
+ self.ssl = SSLConfig(cert=cert, verify=verify, trust_env=trust_env)
self.timeout = TimeoutConfig(timeout)
self.http_versions = HTTPVersionConfig(http_versions)
self.backend = AsyncioBackend() if backend is None else backend
*,
verify: VerifyTypes = True,
cert: CertTypes = None,
+ trust_env: bool = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
http_versions: HTTPVersionTypes = None,
self.pool_limits = pool_limits
self.http_versions = http_versions
self.is_closed = False
+ self.trust_env = trust_env
self.keepalive_connections = ConnectionStore()
self.active_connections = ConnectionStore()
http_versions=self.http_versions,
backend=self.backend,
release_func=self.release_connection,
+ trust_env=self.trust_env,
)
logger.debug(f"new_connection connection={connection!r}")
else:
import ssl
+import sys
import pytest
def test_timeout_from_config_instance():
timeout = httpx.TimeoutConfig(timeout=5.0)
assert httpx.TimeoutConfig(timeout) == httpx.TimeoutConfig(timeout=5.0)
+
+
+@pytest.mark.skipif(
+ not hasattr(ssl.SSLContext, "keylog_filename"),
+ reason="requires OpenSSL 1.1.1 or higher",
+)
+@pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8 or higher")
+def test_ssl_config_support_for_keylog_file(tmpdir, monkeypatch):
+ with monkeypatch.context() as m:
+ m.delenv("SSLKEYLOGFILE", raising=False)
+
+ ssl_config = httpx.SSLConfig(trust_env=True)
+ ssl_config.load_ssl_context()
+
+ assert ssl_config.ssl_context.keylog_filename is None
+
+ filename = str(tmpdir.join("test.log"))
+
+ with monkeypatch.context() as m:
+ m.setenv("SSLKEYLOGFILE", filename)
+
+ ssl_config = httpx.SSLConfig(trust_env=True)
+ ssl_config.load_ssl_context()
+
+ assert ssl_config.ssl_context.keylog_filename == filename
+
+ ssl_config = httpx.SSLConfig(trust_env=False)
+ ssl_config.load_ssl_context()
+
+ assert ssl_config.ssl_context.keylog_filename is None