import hstspreload
+from .auth import BasicAuth
from .concurrency.asyncio import AsyncioBackend
from .concurrency.base import ConcurrencyBackend
from .config import (
from .dispatch.base import Dispatcher
from .dispatch.connection_pool import ConnectionPool
from .dispatch.proxy_http import HTTPProxy
-from .exceptions import HTTPError, InvalidURL
-from .middleware.base import BaseMiddleware
-from .middleware.basic_auth import BasicAuthMiddleware
-from .middleware.custom_auth import CustomAuthMiddleware
-from .middleware.redirect import RedirectMiddleware
+from .exceptions import (
+ HTTPError,
+ InvalidURL,
+ RedirectBodyUnavailable,
+ RedirectLoop,
+ TooManyRedirects,
+)
+from .middleware import Middleware
from .models import (
URL,
AuthTypes,
Response,
URLTypes,
)
+from .status_codes import codes
from .utils import ElapsedTimer, get_environment_proxies, get_logger, get_netrc
logger = get_logger(__name__)
if app is not None:
dispatch = ASGIDispatch(app=app, backend=backend)
- self.trust_env = True if trust_env is None else trust_env
-
if dispatch is None:
dispatch = ConnectionPool(
verify=verify,
http_versions=http_versions,
pool_limits=pool_limits,
backend=backend,
- trust_env=self.trust_env,
+ trust_env=trust_env,
uds=uds,
)
self._headers = Headers(headers)
self._cookies = Cookies(cookies)
self.max_redirects = max_redirects
+ self.trust_env = trust_env
self.dispatch = dispatch
self.concurrency_backend = backend
def params(self, params: QueryParamTypes) -> None:
self._params = QueryParams(params)
+ async def request(
+ self,
+ method: str,
+ url: URLTypes,
+ *,
+ data: RequestData = None,
+ files: RequestFiles = None,
+ json: typing.Any = None,
+ params: QueryParamTypes = None,
+ headers: HeaderTypes = None,
+ cookies: CookieTypes = None,
+ stream: bool = False,
+ auth: AuthTypes = None,
+ allow_redirects: bool = True,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
+ trust_env: bool = None,
+ ) -> Response:
+ request = self.build_request(
+ method=method,
+ url=url,
+ data=data,
+ files=files,
+ json=json,
+ params=params,
+ headers=headers,
+ cookies=cookies,
+ )
+ response = await self.send(
+ request,
+ stream=stream,
+ auth=auth,
+ allow_redirects=allow_redirects,
+ verify=verify,
+ cert=cert,
+ timeout=timeout,
+ trust_env=trust_env,
+ )
+ return response
+
+ def build_request(
+ self,
+ method: str,
+ url: URLTypes,
+ *,
+ data: RequestData = None,
+ files: RequestFiles = None,
+ json: typing.Any = None,
+ params: QueryParamTypes = None,
+ headers: HeaderTypes = None,
+ cookies: CookieTypes = None,
+ ) -> Request:
+ """
+ Build and return a request instance.
+ """
+ url = self.merge_url(url)
+ headers = self.merge_headers(headers)
+ cookies = self.merge_cookies(cookies)
+ params = self.merge_queryparams(params)
+ return Request(
+ method,
+ url,
+ data=data,
+ files=files,
+ json=json,
+ params=params,
+ headers=headers,
+ cookies=cookies,
+ )
+
def merge_url(self, url: URLTypes) -> URL:
+ """
+ Merge a URL argument together with any 'base_url' on the client,
+ to create the URL used for the outgoing request.
+ """
url = self.base_url.join(relative_url=url)
if url.scheme == "http" and hstspreload.in_hsts_preload(url.host):
url = url.copy_with(scheme="https")
def merge_cookies(
self, cookies: CookieTypes = None
) -> typing.Optional[CookieTypes]:
+ """
+ Merge a cookies argument together with any cookies on the client,
+ to create the cookies used for the outgoing request.
+ """
if cookies or self.cookies:
merged_cookies = Cookies(self.cookies)
merged_cookies.update(cookies)
def merge_headers(
self, headers: HeaderTypes = None
) -> typing.Optional[HeaderTypes]:
+ """
+ Merge a headers argument together with any headers on the client,
+ to create the headers used for the outgoing request.
+ """
if headers or self.headers:
merged_headers = Headers(self.headers)
merged_headers.update(headers)
def merge_queryparams(
self, params: QueryParamTypes = None
) -> typing.Optional[QueryParamTypes]:
+ """
+ Merge a queryparams argument together with any queryparams on the client,
+ to create the queryparams used for the outgoing request.
+ """
if params or self.params:
merged_queryparams = QueryParams(self.params)
merged_queryparams.update(params)
return merged_queryparams
return params
- async def _get_response(
+ async def send(
self,
request: Request,
*,
if request.url.scheme not in ("http", "https"):
raise InvalidURL('URL scheme must be "http" or "https".')
- dispatch = self._dispatcher_for_request(request, self.proxies)
+ auth = self.auth if auth is None else auth
+ trust_env = self.trust_env if trust_env is None else trust_env
- async def get_response(request: Request) -> Response:
- try:
- with ElapsedTimer() as timer:
- response = await dispatch.send(
- request, verify=verify, cert=cert, timeout=timeout
- )
- response.elapsed = timer.elapsed
- response.request = request
- except HTTPError as exc:
- # Add the original request to any HTTPError unless
- # there'a already a request attached in the case of
- # a ProxyError.
- if exc.request is None:
- exc.request = request
- raise
-
- self.cookies.extract_cookies(response)
- if not stream:
- try:
- await response.read()
- finally:
- await response.close()
-
- status = f"{response.status_code} {response.reason_phrase}"
- response_line = f"{response.http_version} {status}"
- logger.debug(
- f'HTTP Request: {request.method} {request.url} "{response_line}"'
+ if not isinstance(auth, Middleware):
+ request = self.authenticate(request, trust_env, auth)
+ response = await self.send_handling_redirects(
+ request,
+ verify=verify,
+ cert=cert,
+ timeout=timeout,
+ allow_redirects=allow_redirects,
)
+ else:
+ get_response = functools.partial(
+ self.send_handling_redirects,
+ verify=verify,
+ cert=cert,
+ timeout=timeout,
+ allow_redirects=allow_redirects,
+ )
+ response = await auth(request, get_response)
- return response
+ if not stream:
+ try:
+ await response.read()
+ finally:
+ await response.close()
- def wrap(
- get_response: typing.Callable, middleware: BaseMiddleware
- ) -> typing.Callable:
- return functools.partial(middleware, get_response=get_response)
+ return response
- get_response = wrap(
- get_response,
- RedirectMiddleware(allow_redirects=allow_redirects, cookies=self.cookies),
- )
+ def authenticate(
+ self, request: Request, trust_env: bool, auth: AuthTypes = None
+ ) -> "Request":
+ if auth is not None:
+ if isinstance(auth, tuple):
+ auth = BasicAuth(username=auth[0], password=auth[1])
+ return auth(request)
- auth_middleware = self._get_auth_middleware(
- request=request,
- trust_env=self.trust_env if trust_env is None else trust_env,
- auth=self.auth if auth is None else auth,
- )
+ username, password = request.url.username, request.url.password
+ if username or password:
+ auth = BasicAuth(username=username, password=password)
+ return auth(request)
- if auth_middleware is not None:
- get_response = wrap(get_response, auth_middleware)
+ if trust_env:
+ netrc_info = self._get_netrc()
+ if netrc_info is not None:
+ netrc_login = netrc_info.authenticators(request.url.authority)
+ netrc_username, _, netrc_password = netrc_login or ("", None, None)
+ if netrc_password is not None:
+ auth = BasicAuth(username=netrc_username, password=netrc_password)
+ return auth(request)
- return await get_response(request)
+ return request
- def _get_auth_middleware(
- self, request: Request, trust_env: bool, auth: AuthTypes = None
- ) -> typing.Optional[BaseMiddleware]:
- if isinstance(auth, tuple):
- return BasicAuthMiddleware(username=auth[0], password=auth[1])
- elif isinstance(auth, BaseMiddleware):
- return auth
- elif callable(auth):
- return CustomAuthMiddleware(auth=auth)
+ async def send_handling_redirects(
+ self,
+ request: Request,
+ verify: VerifyTypes = None,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = None,
+ allow_redirects: bool = True,
+ history: typing.List[Response] = None,
+ ) -> Response:
+ if history is None:
+ history = []
- if auth is not None:
- raise TypeError(
- 'When specified, "auth" must be a (username, password) tuple or '
- "a callable with signature (Request) -> Request "
- f"(got {auth!r})"
- )
+ while True:
+ if len(history) > self.max_redirects:
+ raise TooManyRedirects()
+ if request.url in (response.url for response in history):
+ raise RedirectLoop()
- if request.url.username or request.url.password:
- return BasicAuthMiddleware(
- username=request.url.username, password=request.url.password
+ response = await self.send_single_request(
+ request, verify=verify, cert=cert, timeout=timeout
)
+ response.history = list(history)
+
+ if not response.is_redirect:
+ return response
+
+ await response.close()
+ request = self.build_redirect_request(request, response)
+ history = history + [response]
+
+ if not allow_redirects:
+ response.call_next = functools.partial(
+ self.send_handling_redirects,
+ request=request,
+ verify=verify,
+ cert=cert,
+ timeout=timeout,
+ allow_redirects=False,
+ history=history,
+ )
+ return response
+
+ def build_redirect_request(self, request: Request, response: Response) -> Request:
+ """
+ Given a request and a redirect response, return a new request that
+ should be used to effect the redirect.
+ """
+ method = self.redirect_method(request, response)
+ url = self.redirect_url(request, response)
+ headers = self.redirect_headers(request, url, method)
+ content = self.redirect_content(request, method)
+ cookies = Cookies(self.cookies)
+ return Request(
+ method=method, url=url, headers=headers, data=content, cookies=cookies
+ )
- if trust_env:
- netrc_info = self._get_netrc()
- if netrc_info:
- netrc_login = netrc_info.authenticators(request.url.authority)
- if netrc_login:
- username, _, password = netrc_login
- assert password is not None
- return BasicAuthMiddleware(username=username, password=password)
+ def redirect_method(self, request: Request, response: Response) -> str:
+ """
+ When being redirected we may want to change the method of the request
+ based on certain specs or browser behavior.
+ """
+ method = request.method
+
+ # https://tools.ietf.org/html/rfc7231#section-6.4.4
+ if response.status_code == codes.SEE_OTHER and method != "HEAD":
+ method = "GET"
+
+ # Do what the browsers do, despite standards...
+ # Turn 302s into GETs.
+ if response.status_code == codes.FOUND and method != "HEAD":
+ method = "GET"
+
+ # If a POST is responded to with a 301, turn it into a GET.
+ # This bizarre behaviour is explained in 'requests' issue 1704.
+ if response.status_code == codes.MOVED_PERMANENTLY and method == "POST":
+ method = "GET"
- return None
+ return method
+
+ def redirect_url(self, request: Request, response: Response) -> URL:
+ """
+ Return the URL for the redirect to follow.
+ """
+ location = response.headers["Location"]
+
+ url = URL(location, allow_relative=True)
+
+ # Facilitate relative 'Location' headers, as allowed by RFC 7231.
+ # (e.g. '/path/to/resource' instead of 'http://domain.tld/path/to/resource')
+ if url.is_relative_url:
+ url = request.url.join(url)
+
+ # Attach previous fragment if needed (RFC 7231 7.1.2)
+ if request.url.fragment and not url.fragment:
+ url = url.copy_with(fragment=request.url.fragment)
+
+ return url
+
+ def redirect_headers(self, request: Request, url: URL, method: str) -> Headers:
+ """
+ Return the headers that should be used for the redirect request.
+ """
+ headers = Headers(request.headers)
+
+ if url.origin != request.url.origin:
+ # Strip Authorization headers when responses are redirected away from
+ # the origin.
+ headers.pop("Authorization", None)
+ headers["Host"] = url.authority
+
+ if method != request.method and method == "GET":
+ # If we've switch to a 'GET' request, then strip any headers which
+ # are only relevant to the request body.
+ headers.pop("Content-Length", None)
+ headers.pop("Transfer-Encoding", None)
+
+ # We should use the client cookie store to determine any cookie header,
+ # rather than whatever was on the original outgoing request.
+ headers.pop("Cookie", None)
+
+ return headers
+
+ def redirect_content(self, request: Request, method: str) -> bytes:
+ """
+ Return the body that should be used for the redirect request.
+ """
+ if method != request.method and method == "GET":
+ return b""
+ if request.is_streaming:
+ raise RedirectBodyUnavailable()
+ return request.content
+
+ async def send_single_request(
+ self,
+ request: Request,
+ verify: VerifyTypes = None,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = None,
+ ) -> Response:
+ """
+ Sends a single request, without handling any redirections.
+ """
+
+ dispatcher = self._dispatcher_for_request(request, self.proxies)
+
+ try:
+ with ElapsedTimer() as timer:
+ response = await dispatcher.send(
+ request, verify=verify, cert=cert, timeout=timeout
+ )
+ response.elapsed = timer.elapsed
+ response.request = request
+ except HTTPError as exc:
+ # Add the original request to any HTTPError unless
+ # there'a already a request attached in the case of
+ # a ProxyError.
+ if exc.request is None:
+ exc.request = request
+ raise
+
+ self.cookies.extract_cookies(response)
+
+ status = f"{response.status_code} {response.reason_phrase}"
+ response_line = f"{response.http_version} {status}"
+ logger.debug(f'HTTP Request: {request.method} {request.url} "{response_line}"')
+
+ return response
@functools.lru_cache(1)
def _get_netrc(self) -> typing.Optional[netrc.netrc]:
return self.dispatch
- def build_request(
- self,
- method: str,
- url: URLTypes,
- *,
- data: RequestData = None,
- files: RequestFiles = None,
- json: typing.Any = None,
- params: QueryParamTypes = None,
- headers: HeaderTypes = None,
- cookies: CookieTypes = None,
- ) -> Request:
- """
- Build and return a request instance.
- """
- url = self.merge_url(url)
- headers = self.merge_headers(headers)
- cookies = self.merge_cookies(cookies)
- params = self.merge_queryparams(params)
- return Request(
- method,
- url,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- )
-
async def get(
self,
url: URLTypes,
trust_env=trust_env,
)
- async def request(
- self,
- method: str,
- url: URLTypes,
- *,
- data: RequestData = None,
- files: RequestFiles = None,
- json: typing.Any = None,
- params: QueryParamTypes = None,
- headers: HeaderTypes = None,
- cookies: CookieTypes = None,
- stream: bool = False,
- auth: AuthTypes = None,
- allow_redirects: bool = True,
- cert: CertTypes = None,
- verify: VerifyTypes = None,
- timeout: TimeoutTypes = None,
- trust_env: bool = None,
- ) -> Response:
- request = self.build_request(
- method=method,
- url=url,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- )
- response = await self.send(
- request,
- stream=stream,
- auth=auth,
- allow_redirects=allow_redirects,
- verify=verify,
- cert=cert,
- timeout=timeout,
- trust_env=trust_env,
- )
- return response
-
- async def send(
- self,
- request: Request,
- *,
- stream: bool = False,
- auth: AuthTypes = None,
- allow_redirects: bool = True,
- verify: VerifyTypes = None,
- cert: CertTypes = None,
- timeout: TimeoutTypes = None,
- trust_env: bool = None,
- ) -> Response:
- return await self._get_response(
- request=request,
- stream=stream,
- auth=auth,
- allow_redirects=allow_redirects,
- verify=verify,
- cert=cert,
- timeout=timeout,
- trust_env=trust_env,
- )
-
async def close(self) -> None:
await self.dispatch.close()
+++ /dev/null
-import functools
-import typing
-
-from ..config import DEFAULT_MAX_REDIRECTS
-from ..exceptions import RedirectBodyUnavailable, RedirectLoop, TooManyRedirects
-from ..models import URL, Cookies, Headers, Request, Response
-from ..status_codes import codes
-from .base import BaseMiddleware
-
-
-class RedirectMiddleware(BaseMiddleware):
- def __init__(
- self,
- allow_redirects: bool = True,
- max_redirects: int = DEFAULT_MAX_REDIRECTS,
- cookies: typing.Optional[Cookies] = None,
- ):
- self.allow_redirects = allow_redirects
- self.max_redirects = max_redirects
- self.cookies = cookies
- self.history: typing.List[Response] = []
-
- async def __call__(
- self, request: Request, get_response: typing.Callable
- ) -> Response:
- if len(self.history) > self.max_redirects:
- raise TooManyRedirects()
- if request.url in (response.url for response in self.history):
- raise RedirectLoop()
-
- response = await get_response(request)
- response.history = list(self.history)
-
- if not response.is_redirect:
- return response
-
- self.history.append(response)
- next_request = self.build_redirect_request(request, response)
-
- if self.allow_redirects:
- return await self(next_request, get_response)
-
- response.call_next = functools.partial(self, next_request, get_response)
- return response
-
- def build_redirect_request(self, request: Request, response: Response) -> Request:
- method = self.redirect_method(request, response)
- url = self.redirect_url(request, response)
- headers = self.redirect_headers(request, url, method) # TODO: merge headers?
- content = self.redirect_content(request, method)
- cookies = Cookies(self.cookies)
- return Request(
- method=method, url=url, headers=headers, data=content, cookies=cookies
- )
-
- def redirect_method(self, request: Request, response: Response) -> str:
- """
- When being redirected we may want to change the method of the request
- based on certain specs or browser behavior.
- """
- method = request.method
-
- # https://tools.ietf.org/html/rfc7231#section-6.4.4
- if response.status_code == codes.SEE_OTHER and method != "HEAD":
- method = "GET"
-
- # Do what the browsers do, despite standards...
- # Turn 302s into GETs.
- if response.status_code == codes.FOUND and method != "HEAD":
- method = "GET"
-
- # If a POST is responded to with a 301, turn it into a GET.
- # This bizarre behaviour is explained in 'requests' issue 1704.
- if response.status_code == codes.MOVED_PERMANENTLY and method == "POST":
- method = "GET"
-
- return method
-
- def redirect_url(self, request: Request, response: Response) -> URL:
- """
- Return the URL for the redirect to follow.
- """
- location = response.headers["Location"]
-
- url = URL(location, allow_relative=True)
-
- # Facilitate relative 'Location' headers, as allowed by RFC 7231.
- # (e.g. '/path/to/resource' instead of 'http://domain.tld/path/to/resource')
- if url.is_relative_url:
- url = request.url.join(url)
-
- # Attach previous fragment if needed (RFC 7231 7.1.2)
- if request.url.fragment and not url.fragment:
- url = url.copy_with(fragment=request.url.fragment)
-
- return url
-
- def redirect_headers(self, request: Request, url: URL, method: str) -> Headers:
- """
- Return the headers that should be used for the redirect request.
- """
- headers = Headers(request.headers)
-
- if url.origin != request.url.origin:
- # Strip Authorization headers when responses are redirected away from
- # the origin.
- headers.pop("Authorization", None)
- headers["Host"] = url.authority
-
- if method != request.method and method == "GET":
- # If we've switch to a 'GET' request, then strip any headers which
- # are only relevant to the request body.
- headers.pop("Content-Length", None)
- headers.pop("Transfer-Encoding", None)
-
- # We should use the client cookie store to determine any cookie header,
- # rather than whatever was on the original outgoing request.
- headers.pop("Cookie", None)
-
- return headers
-
- def redirect_content(self, request: Request, method: str) -> bytes:
- """
- Return the body that should be used for the redirect request.
- """
- if method != request.method and method == "GET":
- return b""
- if request.is_streaming:
- raise RedirectBodyUnavailable()
- return request.content