assert r.text == "Hello World!"
```
-For some more complex cases you might need to customize the WSGI dispatch. This allows you to:
+For some more complex cases you might need to customize the WSGI transport. This allows you to:
* Inspect 500 error responses rather than raise exceptions by setting `raise_app_exceptions=False`.
* Mount the WSGI application at a subpath by setting `script_name` (WSGI).
```python
# Instantiate a client that makes WSGI requests with a client IP of "1.2.3.4".
-dispatch = httpx.WSGIDispatch(app=app, remote_addr="1.2.3.4")
-with httpx.Client(dispatch=dispatch, base_url="http://testserver") as client:
+transport = httpx.WSGITransport(app=app, remote_addr="1.2.3.4")
+with httpx.Client(transport=transport, base_url="http://testserver") as client:
...
```
>>> r
Response <200 OK>
```
+
+## Custom Transports
+
+HTTPX's `Client` also accepts a `transport` argument. This argument allows you
+to provide a custom Transport object that will be used to perform the actual
+sending of the requests.
+
+A transport instance must implement the Transport API defined by
+[`httpcore`](https://www.encode.io/httpcore/api/). You
+should either subclass `httpcore.AsyncHTTPTransport` to implement a transport to
+use with `AsyncClient`, or subclass `httpcore.SyncHTTPTransport` to implement a
+transport to use with `Client`.
+
+For example, HTTPX ships with a transport that uses the excellent
+[`urllib3` library](https://urllib3.readthedocs.io/en/latest/):
+
+```python
+>>> import httpx
+>>> client = httpx.Client(transport=httpx.URLLib3Transport())
+>>> client.get("https://example.org")
+<Response [200 OK]>
+```
+
+A complete example of a transport implementation would be:
+
+```python
+import json
+
+import httpcore
+import httpx
+
+
+class JSONEchoTransport(httpcore.SyncHTTPTransport):
+ """
+ A mock transport that returns a JSON response containing the request body.
+ """
+
+ def request(self, method, url, headers=None, stream=None, timeout=None):
+ body = b"".join(stream).decode("utf-8")
+ content = json.dumps({"body": body}).encode("utf-8")
+ stream = httpcore.SyncByteStream([content])
+ headers = [(b"content-type", b"application/json")]
+ return b"HTTP/1.1", 200, b"OK", headers, stream
+```
+
+Which we can use in the same way:
+
+```python
+>>> client = httpx.Client(transport=JSONEchoTransport())
+>>> response = client.post("https://httpbin.org/post", data="Hello, world!")
+>>> response.json()
+{'body': 'Hello, world!'}
+```
... assert r.text == "Hello World!"
```
-For some more complex cases you might need to customise the ASGI dispatch. This allows you to:
+For some more complex cases you might need to customise the ASGI transport. This allows you to:
* Inspect 500 error responses rather than raise exceptions by setting `raise_app_exceptions=False`.
* Mount the ASGI application at a subpath by setting `root_path`.
```python
# Instantiate a client that makes ASGI requests with a client IP of "1.2.3.4",
# on port 123.
-dispatch = httpx.ASGIDispatch(app=app, client=("1.2.3.4", 123))
-async with httpx.AsyncClient(dispatch=dispatch, base_url="http://testserver") as client:
+transport = httpx.ASGITransport(app=app, client=("1.2.3.4", 123))
+async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
...
```
from ._auth import Auth, BasicAuth, DigestAuth
from ._client import AsyncClient, Client
from ._config import PoolLimits, Proxy, Timeout
-from ._dispatch.asgi import ASGIDispatch
-from ._dispatch.wsgi import WSGIDispatch
from ._exceptions import (
ConnectTimeout,
CookieConflict,
)
from ._models import URL, Cookies, Headers, QueryParams, Request, Response
from ._status_codes import StatusCode, codes
+from ._transports.asgi import ASGIDispatch, ASGITransport
+from ._transports.urllib3 import URLLib3Transport
+from ._transports.wsgi import WSGIDispatch, WSGITransport
__all__ = [
"__description__",
"stream",
"codes",
"ASGIDispatch",
+ "ASGITransport",
"AsyncClient",
"Auth",
"BasicAuth",
"TooManyRedirects",
"WriteTimeout",
"URL",
+ "URLLib3Transport",
"StatusCode",
"Cookies",
"Headers",
"Response",
"DigestAuth",
"WSGIDispatch",
+ "WSGITransport",
]
UnsetType,
)
from ._content_streams import ContentStream
-from ._dispatch.asgi import ASGIDispatch
-from ._dispatch.wsgi import WSGIDispatch
from ._exceptions import HTTPError, InvalidURL, RequestBodyUnavailable, TooManyRedirects
from ._models import URL, Cookies, Headers, Origin, QueryParams, Request, Response
from ._status_codes import codes
+from ._transports.asgi import ASGITransport
+from ._transports.wsgi import WSGITransport
from ._types import (
AuthTypes,
CertTypes,
get_environment_proxies,
get_logger,
should_not_be_proxied,
+ warn_deprecated,
)
logger = get_logger(__name__)
return {"all": proxy}
elif isinstance(proxies, httpcore.AsyncHTTPTransport): # pragma: nocover
raise RuntimeError(
- "Passing a dispatcher instance to 'proxies=' is no longer "
+ "Passing a transport instance to 'proxies=' is no longer "
"supported. Use `httpx.Proxy() instead.`"
)
else:
new_proxies[str(key)] = proxy
elif isinstance(value, httpcore.AsyncHTTPTransport): # pragma: nocover
raise RuntimeError(
- "Passing a dispatcher instance to 'proxies=' is "
+ "Passing a transport instance to 'proxies=' is "
"no longer supported. Use `httpx.Proxy() instead.`"
)
return new_proxies
that should be followed.
* **base_url** - *(optional)* A URL to use as the base when building
request URLs.
- * **dispatch** - *(optional)* A dispatch class to use for sending requests
+ * **transport** - *(optional)* A transport class to use for sending requests
over the network.
+ * **dispatch** - *(optional)* A deprecated alias for transport.
* **app** - *(optional)* An WSGI application to send requests to,
rather than sending actual network requests.
* **trust_env** - *(optional)* Enables or disables usage of environment
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
base_url: URLTypes = None,
+ transport: httpcore.SyncHTTPTransport = None,
dispatch: httpcore.SyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
proxy_map = self.get_proxy_map(proxies, trust_env)
- self.dispatch = self.init_dispatch(
+ if dispatch is not None:
+ warn_deprecated(
+ "The dispatch argument is deprecated since v0.13 and will be "
+ "removed in a future release, please use 'transport'"
+ )
+ if transport is None:
+ transport = dispatch
+
+ self.transport = self.init_transport(
verify=verify,
cert=cert,
pool_limits=pool_limits,
- dispatch=dispatch,
+ transport=transport,
app=app,
trust_env=trust_env,
)
self.proxies: typing.Dict[str, httpcore.SyncHTTPTransport] = {
- key: self.init_proxy_dispatch(
+ key: self.init_proxy_transport(
proxy,
verify=verify,
cert=cert,
for key, proxy in proxy_map.items()
}
- def init_dispatch(
+ def init_transport(
self,
verify: VerifyTypes = True,
cert: CertTypes = None,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
- dispatch: httpcore.SyncHTTPTransport = None,
+ transport: httpcore.SyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
) -> httpcore.SyncHTTPTransport:
- if dispatch is not None:
- return dispatch
+ if transport is not None:
+ return transport
if app is not None:
- return WSGIDispatch(app=app)
+ return WSGITransport(app=app)
ssl_context = SSLConfig(
verify=verify, cert=cert, trust_env=trust_env
max_connections=max_connections,
)
- def init_proxy_dispatch(
+ def init_proxy_transport(
self,
proxy: Proxy,
verify: VerifyTypes = True,
max_connections=max_connections,
)
- def dispatcher_for_url(self, url: URL) -> httpcore.SyncHTTPTransport:
+ def transport_for_url(self, url: URL) -> httpcore.SyncHTTPTransport:
"""
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
)
for proxy_key in proxy_keys:
if proxy_key and proxy_key in self.proxies:
- dispatcher = self.proxies[proxy_key]
- return dispatcher
+ transport = self.proxies[proxy_key]
+ return transport
- return self.dispatch
+ return self.transport
def request(
self,
Sends a single request, without handling any redirections.
"""
- dispatcher = self.dispatcher_for_url(request.url)
+ transport = self.transport_for_url(request.url)
try:
(
reason_phrase,
headers,
stream,
- ) = dispatcher.request(
+ ) = transport.request(
request.method.encode(),
request.url.raw,
headers=request.headers.raw,
)
def close(self) -> None:
- self.dispatch.close()
+ self.transport.close()
for proxy in self.proxies.values():
proxy.close()
that should be followed.
* **base_url** - *(optional)* A URL to use as the base when building
request URLs.
- * **dispatch** - *(optional)* A dispatch class to use for sending requests
+ * **transport** - *(optional)* A transport class to use for sending requests
over the network.
+ * **dispatch** - *(optional)* A deprecated alias for transport.
* **app** - *(optional)* An ASGI application to send requests to,
rather than sending actual network requests.
* **trust_env** - *(optional)* Enables or disables usage of environment
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
base_url: URLTypes = None,
+ transport: httpcore.AsyncHTTPTransport = None,
dispatch: httpcore.AsyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
trust_env=trust_env,
)
+ if dispatch is not None:
+ warn_deprecated(
+ "The dispatch argument is deprecated since v0.13 and will be "
+ "removed in a future release, please use 'transport'",
+ )
+ if transport is None:
+ transport = dispatch
+
proxy_map = self.get_proxy_map(proxies, trust_env)
- self.dispatch = self.init_dispatch(
+ self.transport = self.init_transport(
verify=verify,
cert=cert,
http2=http2,
pool_limits=pool_limits,
- dispatch=dispatch,
+ transport=transport,
app=app,
trust_env=trust_env,
)
self.proxies: typing.Dict[str, httpcore.AsyncHTTPTransport] = {
- key: self.init_proxy_dispatch(
+ key: self.init_proxy_transport(
proxy,
verify=verify,
cert=cert,
for key, proxy in proxy_map.items()
}
- def init_dispatch(
+ def init_transport(
self,
verify: VerifyTypes = True,
cert: CertTypes = None,
http2: bool = False,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
- dispatch: httpcore.AsyncHTTPTransport = None,
+ transport: httpcore.AsyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
) -> httpcore.AsyncHTTPTransport:
- if dispatch is not None:
- return dispatch
+ if transport is not None:
+ return transport
if app is not None:
- return ASGIDispatch(app=app)
+ return ASGITransport(app=app)
ssl_context = SSLConfig(
verify=verify, cert=cert, trust_env=trust_env
http2=http2,
)
- def init_proxy_dispatch(
+ def init_proxy_transport(
self,
proxy: Proxy,
verify: VerifyTypes = True,
http2=http2,
)
- def dispatcher_for_url(self, url: URL) -> httpcore.AsyncHTTPTransport:
+ def transport_for_url(self, url: URL) -> httpcore.AsyncHTTPTransport:
"""
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
)
for proxy_key in proxy_keys:
if proxy_key and proxy_key in self.proxies:
- dispatcher = self.proxies[proxy_key]
- return dispatcher
+ transport = self.proxies[proxy_key]
+ return transport
- return self.dispatch
+ return self.transport
async def request(
self,
Sends a single request, without handling any redirections.
"""
- dispatcher = self.dispatcher_for_url(request.url)
+ transport = self.transport_for_url(request.url)
try:
(
reason_phrase,
headers,
stream,
- ) = await dispatcher.request(
+ ) = await transport.request(
request.method.encode(),
request.url.raw,
headers=request.headers.raw,
)
async def aclose(self) -> None:
- await self.dispatch.aclose()
+ await self.transport.aclose()
for proxy in self.proxies.values():
await proxy.aclose()
import json as jsonlib
import typing
import urllib.request
-import warnings
from collections.abc import MutableMapping
from http.cookiejar import Cookie, CookieJar
from urllib.parse import parse_qsl, urlencode
obfuscate_sensitive_headers,
parse_header_links,
str_query_param,
+ warn_deprecated,
)
if typing.TYPE_CHECKING: # pragma: no cover
@property
def stream(self): # type: ignore
- warnings.warn( # pragma: nocover
+ warn_deprecated( # pragma: nocover
"Response.stream() is due to be deprecated. "
"Use Response.aiter_bytes() instead.",
- DeprecationWarning,
)
return self.aiter_bytes # pragma: nocover
@property
def raw(self): # type: ignore
- warnings.warn( # pragma: nocover
+ warn_deprecated( # pragma: nocover
"Response.raw() is due to be deprecated. "
"Use Response.aiter_raw() instead.",
- DeprecationWarning,
)
return self.aiter_raw # pragma: nocover
import sniffio
from .._content_streams import ByteStream
+from .._utils import warn_deprecated
if typing.TYPE_CHECKING: # pragma: no cover
import asyncio
return asyncio.Event()
-class ASGIDispatch(httpcore.AsyncHTTPTransport):
+class ASGITransport(httpcore.AsyncHTTPTransport):
"""
- A custom AsyncDispatcher that handles sending requests directly to an ASGI app.
+ A custom AsyncTransport that handles sending requests directly to an ASGI app.
The simplest way to use this functionality is to use the `app` argument.
```
Alternatively, you can setup the dispatch instance explicitly.
This allows you to include any additional configuration arguments specific
- to the ASGIDispatch class:
+ to the ASGITransport class:
```
- dispatch = httpx.ASGIDispatch(
+ dispatch = httpx.ASGITransport(
app=app,
root_path="/submount",
client=("1.2.3.4", 123)
stream = ByteStream(b"".join(body_parts))
return (b"HTTP/1.1", status_code, b"", response_headers, stream)
+
+
+class ASGIDispatch(ASGITransport):
+ def __init__(
+ self,
+ app: Callable,
+ raise_app_exceptions: bool = True,
+ root_path: str = "",
+ client: Tuple[str, int] = ("127.0.0.1", 123),
+ ) -> None:
+ warn_deprecated("ASGIDispatch is deprecated, please use ASGITransport")
+ super().__init__(
+ app=app,
+ raise_app_exceptions=raise_app_exceptions,
+ root_path=root_path,
+ client=client,
+ )
from typing import Dict, Iterator, List, Optional, Tuple, Union
import httpcore
-import urllib3
-from urllib3.exceptions import MaxRetryError, SSLError
from .._config import DEFAULT_POOL_LIMITS, PoolLimits, Proxy, SSLConfig
from .._content_streams import ByteStream, IteratorStream
from .._types import CertTypes, VerifyTypes
-from .._utils import as_network_error
+from .._utils import as_network_error, warn_deprecated
+try:
+ import urllib3
+ from urllib3.exceptions import MaxRetryError, SSLError
+except ImportError: # pragma: nocover
+ urllib3 = None
-class URLLib3Dispatcher(httpcore.SyncHTTPTransport):
+
+class URLLib3Transport(httpcore.SyncHTTPTransport):
def __init__(
self,
*,
trust_env: bool = None,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
):
+ assert (
+ urllib3 is not None
+ ), "urllib3 must be installed separately in order to use URLLib3Transport"
+
ssl_config = SSLConfig(
verify=verify, cert=cert, trust_env=trust_env, http2=False
)
def close(self) -> None:
self.pool.clear()
+
+
+class URLLib3Dispatch(URLLib3Transport):
+ def __init__(
+ self,
+ *,
+ proxy: Proxy = None,
+ verify: VerifyTypes = True,
+ cert: CertTypes = None,
+ trust_env: bool = None,
+ pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
+ ):
+ warn_deprecated("URLLib3Dispatch is deprecated, please use URLLib3Transport")
+ super().__init__(
+ proxy=proxy,
+ verify=verify,
+ cert=cert,
+ trust_env=trust_env,
+ pool_limits=pool_limits,
+ )
import httpcore
from .._content_streams import ByteStream, IteratorStream
+from .._utils import warn_deprecated
def _skip_leading_empty_chunks(body: typing.Iterable) -> typing.Iterable:
return []
-class WSGIDispatch(httpcore.SyncHTTPTransport):
+class WSGITransport(httpcore.SyncHTTPTransport):
"""
A custom transport that handles sending requests directly to an WSGI app.
The simplest way to use this functionality is to use the `app` argument.
Alternatively, you can setup the dispatch instance explicitly.
This allows you to include any additional configuration arguments specific
- to the WSGIDispatch class:
+ to the WSGITransport class:
```
- dispatch = httpx.WSGIDispatch(
+ dispatch = httpx.WSGITransport(
app=app,
script_name="/submount",
remote_addr="1.2.3.4"
stream = IteratorStream(chunk for chunk in result)
return (b"HTTP/1.1", status_code, b"", headers, stream)
+
+
+class WSGIDispatch(WSGITransport):
+ def __init__(
+ self,
+ app: typing.Callable,
+ raise_app_exceptions: bool = True,
+ script_name: str = "",
+ remote_addr: str = "127.0.0.1",
+ ) -> None:
+ warn_deprecated("WSGIDispatch is deprecated, please use WSGITransport")
+ super().__init__(
+ app=app,
+ raise_app_exceptions=raise_app_exceptions,
+ script_name=script_name,
+ remote_addr=remote_addr,
+ )
import re
import sys
import typing
+import warnings
from datetime import timedelta
from pathlib import Path
from time import perf_counter
if isinstance(exc, cls):
raise NetworkError(exc) from exc
raise
+
+
+def warn_deprecated(message: str) -> None:
+ warnings.warn(message, DeprecationWarning)
client = AsyncClient(dispatch=MockDispatch())
async def streaming_body():
- yield b"Example request body"
+ yield b"Example request body" # pragma: nocover
with pytest.raises(RequestBodyUnavailable):
await client.post(url, data=streaming_body(), auth=auth)
),
],
)
-def test_dispatcher_for_request(url, proxies, expected):
+def test_transport_for_request(url, proxies, expected):
client = httpx.AsyncClient(proxies=proxies)
- dispatcher = client.dispatcher_for_url(httpx.URL(url))
+ transport = client.transport_for_url(httpx.URL(url))
if expected is None:
- assert dispatcher is client.dispatch
+ assert transport is client.transport
else:
- assert dispatcher.proxy_origin == httpx.URL(expected).raw[:3]
+ assert transport.proxy_origin == httpx.URL(expected).raw[:3]
def test_unsupported_proxy_scheme():
monkeypatch.setenv(name, value)
client = httpx.AsyncClient()
- dispatcher = client.dispatcher_for_url(httpx.URL(url))
+ transport = client.transport_for_url(httpx.URL(url))
if expected is None:
- assert dispatcher == client.dispatch
+ assert transport == client.transport
else:
- assert dispatcher.proxy_origin == httpx.URL(expected).raw[:3]
+ assert transport.proxy_origin == httpx.URL(expected).raw[:3]