)
from .content_streams import ContentStream
from .dispatch.asgi import ASGIDispatch
-from .dispatch.base import Dispatcher
+from .dispatch.base import AsyncDispatcher
from .dispatch.connection_pool import ConnectionPool
from .dispatch.proxy_http import HTTPProxy
from .exceptions import (
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
base_url: URLTypes = None,
- dispatch: Dispatcher = None,
+ dispatch: AsyncDispatcher = None,
app: typing.Callable = None,
backend: typing.Union[str, ConcurrencyBackend] = "auto",
trust_env: bool = True,
if proxies is None and trust_env:
proxies = typing.cast(ProxiesTypes, get_environment_proxies())
- self.proxies: typing.Dict[str, Dispatcher] = _proxies_to_dispatchers(
+ self.proxies: typing.Dict[str, AsyncDispatcher] = _proxies_to_dispatchers(
proxies,
verify=verify,
cert=cert,
return response
- def dispatcher_for_url(self, url: URL) -> Dispatcher:
+ def dispatcher_for_url(self, url: URL) -> AsyncDispatcher:
"""
- Returns the Dispatcher instance that should be used for a given URL.
+ Returns the AsyncDispatcher instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
if self.proxies:
)
for proxy_key in proxy_keys:
if proxy_key and proxy_key in self.proxies:
- dispatcher = self.proxies[proxy_key]
- return dispatcher
+ return self.proxies[proxy_key]
return self.dispatch
pool_limits: PoolLimits,
backend: typing.Union[str, ConcurrencyBackend],
trust_env: bool,
-) -> typing.Dict[str, Dispatcher]:
- def _proxy_from_url(url: URLTypes) -> Dispatcher:
+) -> typing.Dict[str, AsyncDispatcher]:
+ def _proxy_from_url(url: URLTypes) -> AsyncDispatcher:
nonlocal verify, cert, http2, pool_limits, backend, trust_env
url = URL(url)
if url.scheme in ("http", "https"):
return {}
elif isinstance(proxies, (str, URL)):
return {"all": _proxy_from_url(proxies)}
- elif isinstance(proxies, Dispatcher):
+ elif isinstance(proxies, AsyncDispatcher):
return {"all": proxies}
else:
new_proxies = {}
from ..config import TimeoutTypes
from ..content_streams import ByteStream
from ..models import Request, Response
-from .base import Dispatcher
+from .base import AsyncDispatcher
-class ASGIDispatch(Dispatcher):
+class ASGIDispatch(AsyncDispatcher):
"""
- A custom dispatcher that handles sending requests directly to an ASGI app.
+ A custom AsyncDispatcher that handles sending requests directly to an ASGI app.
The simplest way to use this functionality is to use the `app` argument.
This will automatically infer if 'app' is a WSGI or an ASGI application,
)
-class Dispatcher:
+class AsyncDispatcher:
"""
- Base class for dispatcher classes, that handle sending the request.
+ Base class for AsyncDispatcher classes, that handle sending the request.
Stubs out the interface, as well as providing a `.request()` convenience
- implementation, to make it easy to use or test stand-alone dispatchers,
+ implementation, to make it easy to use or test stand-alone AsyncDispatchers,
without requiring a complete `AsyncClient` instance.
"""
async def close(self) -> None:
pass # pragma: nocover
- async def __aenter__(self) -> "Dispatcher":
+ async def __aenter__(self) -> "AsyncDispatcher":
return self
async def __aexit__(
from ..config import SSLConfig, Timeout
from ..models import URL, Origin, Request, Response
from ..utils import get_logger
-from .base import Dispatcher
+from .base import AsyncDispatcher
from .http2 import HTTP2Connection
from .http11 import HTTP11Connection
logger = get_logger(__name__)
-class HTTPConnection(Dispatcher):
+class HTTPConnection(AsyncDispatcher):
def __init__(
self,
origin: typing.Union[str, Origin],
from ..exceptions import PoolTimeout
from ..models import Origin, Request, Response
from ..utils import get_logger
-from .base import Dispatcher
+from .base import AsyncDispatcher
from .connection import HTTPConnection
CONNECTIONS_DICT = typing.Dict[Origin, typing.List[HTTPConnection]]
return len(self.all)
-class ConnectionPool(Dispatcher):
+class ConnectionPool(AsyncDispatcher):
KEEP_ALIVE_EXPIRY = 5.0
def __init__(
)
if typing.TYPE_CHECKING: # pragma: no cover
- from .dispatch.base import Dispatcher # noqa: F401
+ from .dispatch.base import AsyncDispatcher # noqa: F401
PrimitiveData = typing.Optional[typing.Union[str, int, float, bool]]
CookieTypes = typing.Union["Cookies", CookieJar, typing.Dict[str, str]]
ProxiesTypes = typing.Union[
- URLTypes, "Dispatcher", typing.Dict[URLTypes, typing.Union[URLTypes, "Dispatcher"]]
+ URLTypes,
+ "AsyncDispatcher",
+ typing.Dict[URLTypes, typing.Union[URLTypes, "AsyncDispatcher"]],
]
)
from httpx.auth import Auth, AuthFlow
from httpx.config import CertTypes, TimeoutTypes, VerifyTypes
-from httpx.dispatch.base import Dispatcher
+from httpx.dispatch.base import AsyncDispatcher
-class MockDispatch(Dispatcher):
+class MockDispatch(AsyncDispatcher):
def __init__(self, auth_header: str = "", status_code: int = 200) -> None:
self.auth_header = auth_header
self.status_code = status_code
)
-class MockDigestAuthDispatch(Dispatcher):
+class MockDigestAuthDispatch(AsyncDispatcher):
def __init__(
self,
algorithm: str = "SHA-256",
from httpx import AsyncClient, Cookies, Request, Response
from httpx.config import CertTypes, TimeoutTypes, VerifyTypes
-from httpx.dispatch.base import Dispatcher
+from httpx.dispatch.base import AsyncDispatcher
-class MockDispatch(Dispatcher):
+class MockDispatch(AsyncDispatcher):
async def send(
self,
request: Request,
from httpx import AsyncClient, Headers, Request, Response, __version__
from httpx.config import CertTypes, TimeoutTypes, VerifyTypes
-from httpx.dispatch.base import Dispatcher
+from httpx.dispatch.base import AsyncDispatcher
-class MockDispatch(Dispatcher):
+class MockDispatch(AsyncDispatcher):
async def send(
self,
request: Request,
from httpx import URL, AsyncClient, QueryParams, Request, Response
from httpx.config import CertTypes, TimeoutTypes, VerifyTypes
-from httpx.dispatch.base import Dispatcher
+from httpx.dispatch.base import AsyncDispatcher
-class MockDispatch(Dispatcher):
+class MockDispatch(AsyncDispatcher):
async def send(
self,
request: Request,
codes,
)
from httpx.config import CertTypes, TimeoutTypes, VerifyTypes
-from httpx.dispatch.base import Dispatcher
+from httpx.dispatch.base import AsyncDispatcher
-class MockDispatch(Dispatcher):
+class MockDispatch(AsyncDispatcher):
async def send(
self,
request: Request,
assert response.url == URL("https://www.example.org/cross_subdomain")
-class MockCookieDispatch(Dispatcher):
+class MockCookieDispatch(AsyncDispatcher):
async def send(
self,
request: Request,
import httpx
from httpx.config import CertTypes, TimeoutTypes, VerifyTypes
from httpx.content_streams import encode
-from httpx.dispatch.base import Dispatcher
+from httpx.dispatch.base import AsyncDispatcher
from httpx.utils import format_form_param
-class MockDispatch(Dispatcher):
+class MockDispatch(AsyncDispatcher):
async def send(
self,
request: httpx.Request,