UNSET,
CertTypes,
PoolLimits,
+ ProxiesTypes,
+ Proxy,
Timeout,
TimeoutTypes,
UnsetType,
CookieTypes,
Headers,
HeaderTypes,
- ProxiesTypes,
QueryParams,
QueryParamTypes,
Request,
trust_env: bool = True,
uds: str = None,
):
- if app is not None:
- dispatch = ASGIDispatch(app=app)
-
- if dispatch is None:
- dispatch = ConnectionPool(
- verify=verify,
- cert=cert,
- http2=http2,
- pool_limits=pool_limits,
- backend=backend,
- trust_env=trust_env,
- uds=uds,
- )
-
if base_url is None:
self.base_url = URL("", allow_relative=True)
else:
if params is None:
params = {}
+ if proxies is None and trust_env:
+ proxies = typing.cast(ProxiesTypes, get_environment_proxies())
+
self.auth = auth
self._params = QueryParams(params)
self._headers = Headers(headers)
self.timeout = Timeout(timeout)
self.max_redirects = max_redirects
self.trust_env = trust_env
- self.dispatch = dispatch
self.netrc = NetRCInfo()
+ self.dispatch = self.init_dispatch(
+ verify=verify,
+ cert=cert,
+ http2=http2,
+ pool_limits=pool_limits,
+ dispatch=dispatch,
+ app=app,
+ backend=backend,
+ trust_env=trust_env,
+ uds=uds,
+ )
+
if proxies is None and trust_env:
proxies = typing.cast(ProxiesTypes, get_environment_proxies())
- self.proxies: typing.Dict[str, AsyncDispatcher] = _proxies_to_dispatchers(
+ self.proxies: typing.Dict[str, AsyncDispatcher] = self.proxies_to_dispatchers(
proxies,
verify=verify,
cert=cert,
trust_env=trust_env,
)
+ def init_dispatch(
+ self,
+ verify: VerifyTypes = True,
+ cert: CertTypes = None,
+ http2: bool = False,
+ pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
+ dispatch: AsyncDispatcher = None,
+ app: typing.Callable = None,
+ backend: typing.Union[str, ConcurrencyBackend] = "auto",
+ trust_env: bool = True,
+ uds: str = None,
+ ) -> AsyncDispatcher:
+ if dispatch is not None:
+ return dispatch
+
+ if app is not None:
+ return ASGIDispatch(app=app)
+
+ return ConnectionPool(
+ verify=verify,
+ cert=cert,
+ http2=http2,
+ pool_limits=pool_limits,
+ backend=backend,
+ trust_env=trust_env,
+ uds=uds,
+ )
+
+ def init_proxy_dispatch(
+ self,
+ proxy: Proxy,
+ verify: VerifyTypes = True,
+ cert: CertTypes = None,
+ http2: bool = False,
+ pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
+ backend: typing.Union[str, ConcurrencyBackend] = "auto",
+ trust_env: bool = True,
+ ) -> AsyncDispatcher:
+ return HTTPProxy(
+ proxy_url=proxy.url,
+ proxy_headers=proxy.headers,
+ proxy_mode=proxy.mode,
+ verify=verify,
+ cert=cert,
+ http2=http2,
+ pool_limits=pool_limits,
+ backend=backend,
+ trust_env=trust_env,
+ )
+
+ def proxies_to_dispatchers(
+ self,
+ proxies: typing.Optional[ProxiesTypes],
+ verify: VerifyTypes,
+ cert: typing.Optional[CertTypes],
+ http2: bool,
+ pool_limits: PoolLimits,
+ backend: typing.Union[str, ConcurrencyBackend],
+ trust_env: bool,
+ ) -> typing.Dict[str, AsyncDispatcher]:
+ if proxies is None:
+ return {}
+ elif isinstance(proxies, (str, URL, Proxy)):
+ proxy = Proxy(url=proxies) if isinstance(proxies, (str, URL)) else proxies
+ return {
+ "all": self.init_proxy_dispatch(
+ proxy=proxy,
+ verify=verify,
+ cert=cert,
+ pool_limits=pool_limits,
+ backend=backend,
+ trust_env=trust_env,
+ http2=http2,
+ )
+ }
+ elif isinstance(proxies, AsyncDispatcher): # pragma: nocover
+ return {"all": proxies}
+ # We're supporting this style for now, but we'll want to deprecate it.
+ #
+ # raise RuntimeError(
+ # "Passing a AsyncDispatcher instance to 'proxies=' is no longer
+ # supported. Use `httpx.Proxy() instead.`"
+ # )
+ else:
+ new_proxies = {}
+ for key, value in proxies.items():
+ if isinstance(value, (str, URL, Proxy)):
+ proxy = Proxy(url=value) if isinstance(value, (str, URL)) else value
+ new_proxies[str(key)] = self.init_proxy_dispatch(
+ proxy=proxy,
+ verify=verify,
+ cert=cert,
+ pool_limits=pool_limits,
+ backend=backend,
+ trust_env=trust_env,
+ http2=http2,
+ )
+ elif isinstance(value, AsyncDispatcher): # pragma: nocover
+ new_proxies[str(key)] = value
+ # We're supporting this style for now, but we'll want to
+ # deprecate it.
+ #
+ # raise RuntimeError(
+ # "Passing a AsyncDispatcher instance to 'proxies=' is "
+ # "no longer supported. Use `httpx.Proxy() instead.`"
+ # )
+ return new_proxies
+
@property
def headers(self) -> Headers:
"""
def params(self, params: QueryParamTypes) -> None:
self._params = QueryParams(params)
- async def request(
- self,
- method: str,
- url: URLTypes,
- *,
- data: RequestData = None,
- files: RequestFiles = None,
- json: typing.Any = None,
- params: QueryParamTypes = None,
- headers: HeaderTypes = None,
- cookies: CookieTypes = None,
- stream: bool = False,
- auth: AuthTypes = None,
- allow_redirects: bool = True,
- cert: CertTypes = None,
- verify: VerifyTypes = None,
- timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
- trust_env: bool = None,
- ) -> Response:
- if cert is not None: # pragma: nocover
- raise RuntimeError(
- "Passing a 'cert' argument when making a request on a client "
- "is not supported anymore. Instantiate a new client instead, "
- "passing any 'cert' arguments to the client itself."
- )
-
- if verify is not None: # pragma: nocover
- raise RuntimeError(
- "Passing a 'verify' argument when making a request on a client "
- "is not supported anymore. Instantiate a new client instead, "
- "passing any 'verify' arguments to the client itself."
- )
-
- if trust_env is not None: # pragma: nocover
- raise RuntimeError(
- "Passing a 'trust_env' argument when making a request on a client "
- "is not supported anymore. Instantiate a new client instead, "
- "passing any 'trust_env' argument to the client itself."
- )
-
- if stream: # pragma: nocover
- warnings.warn(
- "The 'stream=True' argument is due to be deprecated. "
- "Use 'async with client.stream(method, url, ...) as response' instead."
- )
-
- request = self.build_request(
- method=method,
- url=url,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- )
- response = await self.send(
- request,
- stream=stream,
- auth=auth,
- allow_redirects=allow_redirects,
- timeout=timeout,
- )
- return response
-
def stream(
self,
method: str,
return merged_queryparams
return params
- async def send(
- self,
- request: Request,
- *,
- stream: bool = False,
- auth: AuthTypes = None,
- allow_redirects: bool = True,
- timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
- ) -> Response:
- if request.url.scheme not in ("http", "https"):
- raise InvalidURL('URL scheme must be "http" or "https".')
-
- timeout = self.timeout if isinstance(timeout, UnsetType) else Timeout(timeout)
-
- auth = self.setup_auth(request, auth)
-
- response = await self.send_handling_redirects(
- request, auth=auth, timeout=timeout, allow_redirects=allow_redirects,
- )
-
- if not stream:
- try:
- await response.aread()
- finally:
- await response.aclose()
-
- return response
-
- def setup_auth(self, request: Request, auth: AuthTypes = None) -> Auth:
+ def build_auth(self, request: Request, auth: AuthTypes = None) -> Auth:
auth = self.auth if auth is None else auth
if auth is not None:
return Auth()
- async def send_handling_redirects(
- self,
- request: Request,
- auth: Auth,
- timeout: Timeout,
- allow_redirects: bool = True,
- history: typing.List[Response] = None,
- ) -> Response:
- if history is None:
- history = []
-
- while True:
- if len(history) > self.max_redirects:
- raise TooManyRedirects()
- if request.url in (response.url for response in history):
- raise RedirectLoop()
-
- response = await self.send_handling_auth(
- request, history, auth=auth, timeout=timeout,
- )
- response.history = list(history)
-
- if not response.is_redirect:
- return response
-
- await response.aread()
- request = self.build_redirect_request(request, response)
- history = history + [response]
-
- if not allow_redirects:
- response.call_next = functools.partial(
- self.send_handling_redirects,
- request=request,
- auth=auth,
- timeout=timeout,
- allow_redirects=False,
- history=history,
- )
- return response
-
def build_redirect_request(self, request: Request, response: Response) -> Request:
"""
Given a request and a redirect response, return a new request that
return request.stream
+ def dispatcher_for_url(self, url: URL) -> AsyncDispatcher:
+ """
+ Returns the AsyncDispatcher instance that should be used for a given URL.
+ This will either be the standard connection pool, or a proxy.
+ """
+ if self.proxies:
+ is_default_port = (url.scheme == "http" and url.port == 80) or (
+ url.scheme == "https" and url.port == 443
+ )
+ hostname = f"{url.host}:{url.port}"
+ proxy_keys = (
+ f"{url.scheme}://{hostname}",
+ f"{url.scheme}://{url.host}" if is_default_port else None,
+ f"all://{hostname}",
+ f"all://{url.host}" if is_default_port else None,
+ url.scheme,
+ "all",
+ )
+ for proxy_key in proxy_keys:
+ if proxy_key and proxy_key in self.proxies:
+ dispatcher = self.proxies[proxy_key]
+ return dispatcher
+
+ return self.dispatch
+
+ async def request(
+ self,
+ method: str,
+ url: URLTypes,
+ *,
+ data: RequestData = None,
+ files: RequestFiles = None,
+ json: typing.Any = None,
+ params: QueryParamTypes = None,
+ headers: HeaderTypes = None,
+ cookies: CookieTypes = None,
+ stream: bool = False,
+ auth: AuthTypes = None,
+ allow_redirects: bool = True,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
+ trust_env: bool = None,
+ ) -> Response:
+ if cert is not None: # pragma: nocover
+ raise RuntimeError(
+ "Passing a 'cert' argument when making a request on a client "
+ "is not supported anymore. Instantiate a new client instead, "
+ "passing any 'cert' arguments to the client itself."
+ )
+
+ if verify is not None: # pragma: nocover
+ raise RuntimeError(
+ "Passing a 'verify' argument when making a request on a client "
+ "is not supported anymore. Instantiate a new client instead, "
+ "passing any 'verify' arguments to the client itself."
+ )
+
+ if trust_env is not None: # pragma: nocover
+ raise RuntimeError(
+ "Passing a 'trust_env' argument when making a request on a client "
+ "is not supported anymore. Instantiate a new client instead, "
+ "passing any 'trust_env' argument to the client itself."
+ )
+
+ if stream: # pragma: nocover
+ warnings.warn(
+ "The 'stream=True' argument is due to be deprecated. "
+ "Use 'async with client.stream(method, url, ...) as response' instead."
+ )
+
+ request = self.build_request(
+ method=method,
+ url=url,
+ data=data,
+ files=files,
+ json=json,
+ params=params,
+ headers=headers,
+ cookies=cookies,
+ )
+ response = await self.send(
+ request,
+ stream=stream,
+ auth=auth,
+ allow_redirects=allow_redirects,
+ timeout=timeout,
+ )
+ return response
+
+ async def send(
+ self,
+ request: Request,
+ *,
+ stream: bool = False,
+ auth: AuthTypes = None,
+ allow_redirects: bool = True,
+ timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
+ ) -> Response:
+ if request.url.scheme not in ("http", "https"):
+ raise InvalidURL('URL scheme must be "http" or "https".')
+
+ timeout = self.timeout if isinstance(timeout, UnsetType) else Timeout(timeout)
+
+ auth = self.build_auth(request, auth)
+
+ response = await self.send_handling_redirects(
+ request, auth=auth, timeout=timeout, allow_redirects=allow_redirects,
+ )
+
+ if not stream:
+ try:
+ await response.aread()
+ finally:
+ await response.aclose()
+
+ return response
+
+ async def send_handling_redirects(
+ self,
+ request: Request,
+ auth: Auth,
+ timeout: Timeout,
+ allow_redirects: bool = True,
+ history: typing.List[Response] = None,
+ ) -> Response:
+ if history is None:
+ history = []
+
+ while True:
+ if len(history) > self.max_redirects:
+ raise TooManyRedirects()
+ if request.url in (response.url for response in history):
+ raise RedirectLoop()
+
+ response = await self.send_handling_auth(
+ request, auth=auth, timeout=timeout, history=history
+ )
+ response.history = list(history)
+
+ if not response.is_redirect:
+ return response
+
+ await response.aread()
+ request = self.build_redirect_request(request, response)
+ history = history + [response]
+
+ if not allow_redirects:
+ response.call_next = functools.partial(
+ self.send_handling_redirects,
+ request=request,
+ auth=auth,
+ timeout=timeout,
+ allow_redirects=False,
+ history=history,
+ )
+ return response
+
async def send_handling_auth(
self,
request: Request,
return response
- def dispatcher_for_url(self, url: URL) -> AsyncDispatcher:
- """
- Returns the AsyncDispatcher instance that should be used for a given URL.
- This will either be the standard connection pool, or a proxy.
- """
- if self.proxies:
- is_default_port = (url.scheme == "http" and url.port == 80) or (
- url.scheme == "https" and url.port == 443
- )
- hostname = f"{url.host}:{url.port}"
- proxy_keys = (
- f"{url.scheme}://{hostname}",
- f"{url.scheme}://{url.host}" if is_default_port else None,
- f"all://{hostname}",
- f"all://{url.host}" if is_default_port else None,
- url.scheme,
- "all",
- )
- for proxy_key in proxy_keys:
- if proxy_key and proxy_key in self.proxies:
- return self.proxies[proxy_key]
-
- return self.dispatch
-
async def get(
self,
url: URLTypes,
await self.aclose()
-def _proxies_to_dispatchers(
- proxies: typing.Optional[ProxiesTypes],
- verify: VerifyTypes,
- cert: typing.Optional[CertTypes],
- http2: bool,
- pool_limits: PoolLimits,
- backend: typing.Union[str, ConcurrencyBackend],
- trust_env: bool,
-) -> typing.Dict[str, AsyncDispatcher]:
- def _proxy_from_url(url: URLTypes) -> AsyncDispatcher:
- nonlocal verify, cert, http2, pool_limits, backend, trust_env
- url = URL(url)
- if url.scheme in ("http", "https"):
- return HTTPProxy(
- url,
- verify=verify,
- cert=cert,
- pool_limits=pool_limits,
- backend=backend,
- trust_env=trust_env,
- http2=http2,
- )
- raise ValueError(f"Unknown proxy for {url!r}")
-
- if proxies is None:
- return {}
- elif isinstance(proxies, (str, URL)):
- return {"all": _proxy_from_url(proxies)}
- elif isinstance(proxies, AsyncDispatcher):
- return {"all": proxies}
- else:
- new_proxies = {}
- for key, dispatcher_or_url in proxies.items():
- if isinstance(dispatcher_or_url, (str, URL)):
- new_proxies[str(key)] = _proxy_from_url(dispatcher_or_url)
- else:
- new_proxies[str(key)] = dispatcher_or_url
- return new_proxies
-
-
class StreamContextManager:
def __init__(
self,