DEFAULT_MAX_REDIRECTS,
DEFAULT_TIMEOUT_CONFIG,
Limits,
- Proxy,
Timeout,
)
from ._decoders import SUPPORTED_DECODERS
)
from ._urls import URL, QueryParams
from ._utils import (
- URLPattern,
- get_environment_proxies,
is_https_redirect,
same_origin,
)
return url
return url.copy_with(raw_path=url.raw_path + b"/")
- def _get_proxy_map(
- self, proxy: ProxyTypes | None, allow_env_proxies: bool
- ) -> dict[str, Proxy | None]:
- if proxy is None:
- if allow_env_proxies:
- return {
- key: None if url is None else Proxy(url=url)
- for key, url in get_environment_proxies().items()
- }
- return {}
- else:
- proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
- return {"all://": proxy}
-
@property
def timeout(self) -> Timeout:
return self._timeout
http1: bool = True,
http2: bool = False,
proxy: ProxyTypes | None = None,
- mounts: None | (typing.Mapping[str, BaseTransport | None]) = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
limits: Limits = DEFAULT_LIMITS,
default_encoding=default_encoding,
)
- if http2:
- try:
- import h2 # noqa
- except ImportError: # pragma: no cover
- raise ImportError(
- "Using http2=True, but the 'h2' package is not installed. "
- "Make sure to install httpx using `pip install httpx[http2]`."
- ) from None
-
- allow_env_proxies = trust_env and transport is None
- proxy_map = self._get_proxy_map(proxy, allow_env_proxies)
-
- self._transport = self._init_transport(
- ssl_context=ssl_context,
- http1=http1,
- http2=http2,
- limits=limits,
- transport=transport,
- trust_env=trust_env,
- # Deprecated in favor of ssl_context...
- verify=verify,
- cert=cert,
- )
- self._mounts: dict[URLPattern, BaseTransport | None] = {
- URLPattern(key): None
- if proxy is None
- else self._init_proxy_transport(
- proxy,
+ if transport is not None:
+ self._transport = transport
+ else:
+ self._transport = HTTPTransport(
ssl_context=ssl_context,
+ proxy=proxy,
http1=http1,
http2=http2,
limits=limits,
- # Deprecated in favor of ssl_context...
verify=verify,
cert=cert,
)
- for key, proxy in proxy_map.items()
- }
- if mounts is not None:
- self._mounts.update(
- {URLPattern(key): transport for key, transport in mounts.items()}
- )
-
- self._mounts = dict(sorted(self._mounts.items()))
-
- def _init_transport(
- self,
- ssl_context: ssl.SSLContext | None = None,
- http1: bool = True,
- http2: bool = False,
- limits: Limits = DEFAULT_LIMITS,
- transport: BaseTransport | None = None,
- trust_env: bool = True,
- # Deprecated in favor of `ssl_context`...
- verify: typing.Any = None,
- cert: typing.Any = None,
- ) -> BaseTransport:
- if transport is not None:
- return transport
-
- return HTTPTransport(
- ssl_context=ssl_context,
- http1=http1,
- http2=http2,
- limits=limits,
- verify=verify,
- cert=cert,
- )
-
- def _init_proxy_transport(
- self,
- proxy: Proxy,
- ssl_context: ssl.SSLContext | None = None,
- http1: bool = True,
- http2: bool = False,
- limits: Limits = DEFAULT_LIMITS,
- trust_env: bool = True,
- # Deprecated in favor of `ssl_context`...
- verify: typing.Any = None,
- cert: typing.Any = None,
- ) -> BaseTransport:
- return HTTPTransport(
- ssl_context=ssl_context,
- http1=http1,
- http2=http2,
- limits=limits,
- proxy=proxy,
- verify=verify,
- cert=cert,
- )
-
- def _transport_for_url(self, url: URL) -> BaseTransport:
- """
- Returns the transport instance that should be used for a given URL.
- This will either be the standard connection pool, or a proxy.
- """
- for pattern, transport in self._mounts.items():
- if pattern.matches(url):
- return self._transport if transport is None else transport
+ @property
+ def transport(self) -> BaseTransport:
return self._transport
def request(
"""
Sends a single request, without handling any redirections.
"""
- transport = self._transport_for_url(request.url)
start = time.perf_counter()
if not isinstance(request.stream, SyncByteStream):
)
with request_context(request=request):
- response = transport.handle_request(request)
+ response = self.transport.handle_request(request)
assert isinstance(response.stream, SyncByteStream)
def close(self) -> None:
"""
- Close transport and proxies.
+ Close transport.
"""
if self._state != ClientState.CLOSED:
self._state = ClientState.CLOSED
-
- self._transport.close()
- for transport in self._mounts.values():
- if transport is not None:
- transport.close()
+ self.transport.close()
def __enter__(self: T) -> T:
if self._state != ClientState.UNOPENED:
raise RuntimeError(msg)
self._state = ClientState.OPENED
-
- self._transport.__enter__()
- for transport in self._mounts.values():
- if transport is not None:
- transport.__enter__()
+ self.transport.__enter__()
return self
def __exit__(
traceback: TracebackType | None = None,
) -> None:
self._state = ClientState.CLOSED
-
- self._transport.__exit__(exc_type, exc_value, traceback)
- for transport in self._mounts.values():
- if transport is not None:
- transport.__exit__(exc_type, exc_value, traceback)
+ self.transport.__exit__(exc_type, exc_value, traceback)
class AsyncClient(BaseClient):
http1: bool = True,
http2: bool = False,
proxy: ProxyTypes | None = None,
- mounts: None | (typing.Mapping[str, AsyncBaseTransport | None]) = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
limits: Limits = DEFAULT_LIMITS,
trust_env=trust_env,
default_encoding=default_encoding,
)
-
- if http2:
- try:
- import h2 # noqa
- except ImportError: # pragma: no cover
- raise ImportError(
- "Using http2=True, but the 'h2' package is not installed. "
- "Make sure to install httpx using `pip install httpx[http2]`."
- ) from None
-
- allow_env_proxies = trust_env and transport is None
- proxy_map = self._get_proxy_map(proxy, allow_env_proxies)
-
- self._transport = self._init_transport(
- ssl_context=ssl_context,
- http1=http1,
- http2=http2,
- limits=limits,
- transport=transport,
- # Deprecated in favor of ssl_context
- verify=verify,
- cert=cert,
- )
-
- self._mounts: dict[URLPattern, AsyncBaseTransport | None] = {
- URLPattern(key): None
- if proxy is None
- else self._init_proxy_transport(
- proxy,
+ if transport is not None:
+ self._transport = transport
+ else:
+ self._transport = AsyncHTTPTransport(
ssl_context=ssl_context,
+ proxy=proxy,
http1=http1,
http2=http2,
limits=limits,
- # Deprecated in favor of `ssl_context`...
verify=verify,
cert=cert,
)
- for key, proxy in proxy_map.items()
- }
- if mounts is not None:
- self._mounts.update(
- {URLPattern(key): transport for key, transport in mounts.items()}
- )
- self._mounts = dict(sorted(self._mounts.items()))
-
- def _init_transport(
- self,
- ssl_context: ssl.SSLContext | None = None,
- http1: bool = True,
- http2: bool = False,
- limits: Limits = DEFAULT_LIMITS,
- transport: AsyncBaseTransport | None = None,
- # Deprecated in favor of `ssl_context`...
- verify: typing.Any = None,
- cert: typing.Any = None,
- ) -> AsyncBaseTransport:
- if transport is not None:
- return transport
-
- return AsyncHTTPTransport(
- ssl_context=ssl_context,
- http1=http1,
- http2=http2,
- limits=limits,
- verify=verify,
- cert=cert,
- )
-
- def _init_proxy_transport(
- self,
- proxy: Proxy,
- ssl_context: ssl.SSLContext | None = None,
- http1: bool = True,
- http2: bool = False,
- limits: Limits = DEFAULT_LIMITS,
- # Deprecated in favor of `ssl_context`...
- verify: typing.Any = None,
- cert: typing.Any = None,
- ) -> AsyncBaseTransport:
- return AsyncHTTPTransport(
- ssl_context=ssl_context,
- http1=http1,
- http2=http2,
- limits=limits,
- proxy=proxy,
- verify=verify,
- cert=cert,
- )
-
- def _transport_for_url(self, url: URL) -> AsyncBaseTransport:
- """
- Returns the transport instance that should be used for a given URL.
- This will either be the standard connection pool, or a proxy.
- """
- for pattern, transport in self._mounts.items():
- if pattern.matches(url):
- return self._transport if transport is None else transport
+ @property
+ def transport(self) -> AsyncBaseTransport:
return self._transport
async def request(
"""
Sends a single request, without handling any redirections.
"""
- transport = self._transport_for_url(request.url)
start = time.perf_counter()
if not isinstance(request.stream, AsyncByteStream):
)
with request_context(request=request):
- response = await transport.handle_async_request(request)
+ response = await self.transport.handle_async_request(request)
assert isinstance(response.stream, AsyncByteStream)
response.request = request
async def aclose(self) -> None:
"""
- Close transport and proxies.
+ Close transport.
"""
if self._state != ClientState.CLOSED:
self._state = ClientState.CLOSED
-
- await self._transport.aclose()
- for proxy in self._mounts.values():
- if proxy is not None:
- await proxy.aclose()
+ await self.transport.aclose()
async def __aenter__(self: U) -> U:
if self._state != ClientState.UNOPENED:
raise RuntimeError(msg)
self._state = ClientState.OPENED
-
- await self._transport.__aenter__()
- for proxy in self._mounts.values():
- if proxy is not None:
- await proxy.__aenter__()
+ await self.transport.__aenter__()
return self
async def __aexit__(
traceback: TracebackType | None = None,
) -> None:
self._state = ClientState.CLOSED
-
- await self._transport.__aexit__(exc_type, exc_value, traceback)
- for proxy in self._mounts.values():
- if proxy is not None:
- await proxy.__aexit__(exc_type, exc_value, traceback)
+ await self.transport.__aexit__(exc_type, exc_value, traceback)
self.headers = headers
self.ssl_context = ssl_context
- @property
- def raw_auth(self) -> tuple[bytes, bytes] | None:
- # The proxy authentication as raw bytes.
- return (
- None
- if self.auth is None
- else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8"))
- )
-
def __repr__(self) -> str:
# The authentication is represented with the password component masked.
auth = (self.auth[0], "********") if self.auth else None
verify: typing.Any = None,
cert: typing.Any = None,
) -> None:
+ if http2:
+ try:
+ import h2 # noqa
+ except ImportError: # pragma: no cover
+ raise ImportError(
+ "Using http2=True, but the 'h2' package is not installed. "
+ "Make sure to install httpx using `pip install httpx[http2]`."
+ ) from None
+
import httpcore
proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
verify: typing.Any = None,
cert: typing.Any = None,
) -> None:
+ if http2:
+ try:
+ import h2 # noqa
+ except ImportError: # pragma: no cover
+ raise ImportError(
+ "Using http2=True, but the 'h2' package is not installed. "
+ "Make sure to install httpx using `pip install httpx[http2]`."
+ ) from None
+
import httpcore
proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
import codecs
import email.message
-import ipaddress
import mimetypes
import os
import re
import typing
-from urllib.request import getproxies
from ._types import PrimitiveData
)
-def get_environment_proxies() -> dict[str, str | None]:
- """Gets proxy information from the environment"""
-
- # urllib.request.getproxies() falls back on System
- # Registry and Config for proxies on Windows and macOS.
- # We don't want to propagate non-HTTP proxies into
- # our configuration such as 'TRAVIS_APT_PROXY'.
- proxy_info = getproxies()
- mounts: dict[str, str | None] = {}
-
- for scheme in ("http", "https", "all"):
- if proxy_info.get(scheme):
- hostname = proxy_info[scheme]
- mounts[f"{scheme}://"] = (
- hostname if "://" in hostname else f"http://{hostname}"
- )
-
- no_proxy_hosts = [host.strip() for host in proxy_info.get("no", "").split(",")]
- for hostname in no_proxy_hosts:
- # See https://curl.haxx.se/libcurl/c/CURLOPT_NOPROXY.html for details
- # on how names in `NO_PROXY` are handled.
- if hostname == "*":
- # If NO_PROXY=* is used or if "*" occurs as any one of the comma
- # separated hostnames, then we should just bypass any information
- # from HTTP_PROXY, HTTPS_PROXY, ALL_PROXY, and always ignore
- # proxies.
- return {}
- elif hostname:
- # NO_PROXY=.google.com is marked as "all://*.google.com,
- # which disables "www.google.com" but not "google.com"
- # NO_PROXY=google.com is marked as "all://*google.com,
- # which disables "www.google.com" and "google.com".
- # (But not "wwwgoogle.com")
- # NO_PROXY can include domains, IPv6, IPv4 addresses and "localhost"
- # NO_PROXY=example.com,::1,localhost,192.168.0.0/16
- if "://" in hostname:
- mounts[hostname] = None
- elif is_ipv4_hostname(hostname):
- mounts[f"all://{hostname}"] = None
- elif is_ipv6_hostname(hostname):
- mounts[f"all://[{hostname}]"] = None
- elif hostname.lower() == "localhost":
- mounts[f"all://{hostname}"] = None
- else:
- mounts[f"all://*{hostname}"] = None
-
- return mounts
-
-
def to_bytes(value: str | bytes, encoding: str = "utf-8") -> bytes:
return value.encode(encoding) if isinstance(value, str) else value
return None
return length
-
-
-class URLPattern:
- """
- A utility class currently used for making lookups against proxy keys...
-
- # Wildcard matching...
- >>> pattern = URLPattern("all://")
- >>> pattern.matches(httpx.URL("http://example.com"))
- True
-
- # Witch scheme matching...
- >>> pattern = URLPattern("https://")
- >>> pattern.matches(httpx.URL("https://example.com"))
- True
- >>> pattern.matches(httpx.URL("http://example.com"))
- False
-
- # With domain matching...
- >>> pattern = URLPattern("https://example.com")
- >>> pattern.matches(httpx.URL("https://example.com"))
- True
- >>> pattern.matches(httpx.URL("http://example.com"))
- False
- >>> pattern.matches(httpx.URL("https://other.com"))
- False
-
- # Wildcard scheme, with domain matching...
- >>> pattern = URLPattern("all://example.com")
- >>> pattern.matches(httpx.URL("https://example.com"))
- True
- >>> pattern.matches(httpx.URL("http://example.com"))
- True
- >>> pattern.matches(httpx.URL("https://other.com"))
- False
-
- # With port matching...
- >>> pattern = URLPattern("https://example.com:1234")
- >>> pattern.matches(httpx.URL("https://example.com:1234"))
- True
- >>> pattern.matches(httpx.URL("https://example.com"))
- False
- """
-
- def __init__(self, pattern: str) -> None:
- from ._urls import URL
-
- if pattern and ":" not in pattern:
- raise ValueError(
- f"Proxy keys should use proper URL forms rather "
- f"than plain scheme strings. "
- f'Instead of "{pattern}", use "{pattern}://"'
- )
-
- url = URL(pattern)
- self.pattern = pattern
- self.scheme = "" if url.scheme == "all" else url.scheme
- self.host = "" if url.host == "*" else url.host
- self.port = url.port
- if not url.host or url.host == "*":
- self.host_regex: typing.Pattern[str] | None = None
- elif url.host.startswith("*."):
- # *.example.com should match "www.example.com", but not "example.com"
- domain = re.escape(url.host[2:])
- self.host_regex = re.compile(f"^.+\\.{domain}$")
- elif url.host.startswith("*"):
- # *example.com should match "www.example.com" and "example.com"
- domain = re.escape(url.host[1:])
- self.host_regex = re.compile(f"^(.+\\.)?{domain}$")
- else:
- # example.com should match "example.com" but not "www.example.com"
- domain = re.escape(url.host)
- self.host_regex = re.compile(f"^{domain}$")
-
- def matches(self, other: URL) -> bool:
- if self.scheme and self.scheme != other.scheme:
- return False
- if (
- self.host
- and self.host_regex is not None
- and not self.host_regex.match(other.host)
- ):
- return False
- if self.port is not None and self.port != other.port:
- return False
- return True
-
- @property
- def priority(self) -> tuple[int, int, int]:
- """
- The priority allows URLPattern instances to be sortable, so that
- we can match from most specific to least specific.
- """
- # URLs with a port should take priority over URLs without a port.
- port_priority = 0 if self.port is not None else 1
- # Longer hostnames should match first.
- host_priority = -len(self.host)
- # Longer schemes should match first.
- scheme_priority = -len(self.scheme)
- return (port_priority, host_priority, scheme_priority)
-
- def __hash__(self) -> int:
- return hash(self.pattern)
-
- def __lt__(self, other: URLPattern) -> bool:
- return self.priority < other.priority
-
- def __eq__(self, other: typing.Any) -> bool:
- return isinstance(other, URLPattern) and self.pattern == other.pattern
-
-
-def is_ipv4_hostname(hostname: str) -> bool:
- try:
- ipaddress.IPv4Address(hostname.split("/")[0])
- except Exception:
- return False
- return True
-
-
-def is_ipv6_hostname(hostname: str) -> bool:
- try:
- ipaddress.IPv6Address(hostname.split("/")[0])
- except Exception:
- return False
- return True
]
-@pytest.mark.anyio
-async def test_context_managed_transport_and_mount():
- class Transport(httpx.AsyncBaseTransport):
- def __init__(self, name: str) -> None:
- self.name: str = name
- self.events: list[str] = []
-
- async def aclose(self):
- # The base implementation of httpx.AsyncBaseTransport just
- # calls into `.aclose`, so simple transport cases can just override
- # this method for any cleanup, where more complex cases
- # might want to additionally override `__aenter__`/`__aexit__`.
- self.events.append(f"{self.name}.aclose")
-
- async def __aenter__(self):
- await super().__aenter__()
- self.events.append(f"{self.name}.__aenter__")
-
- async def __aexit__(self, *args):
- await super().__aexit__(*args)
- self.events.append(f"{self.name}.__aexit__")
-
- transport = Transport(name="transport")
- mounted = Transport(name="mounted")
- async with httpx.AsyncClient(
- transport=transport, mounts={"http://www.example.org": mounted}
- ):
- pass
-
- assert transport.events == [
- "transport.__aenter__",
- "transport.aclose",
- "transport.__aexit__",
- ]
- assert mounted.events == [
- "mounted.__aenter__",
- "mounted.aclose",
- "mounted.__aexit__",
- ]
-
-
def hello_world(request):
return httpx.Response(200, text="Hello, world!")
await client.get("http://example.com")
-def unmounted(request: httpx.Request) -> httpx.Response:
- data = {"app": "unmounted"}
- return httpx.Response(200, json=data)
-
-
-def mounted(request: httpx.Request) -> httpx.Response:
- data = {"app": "mounted"}
- return httpx.Response(200, json=data)
-
-
-@pytest.mark.anyio
-async def test_mounted_transport():
- transport = httpx.MockTransport(unmounted)
- mounts = {"custom://": httpx.MockTransport(mounted)}
-
- async with httpx.AsyncClient(transport=transport, mounts=mounts) as client:
- response = await client.get("https://www.example.com")
- assert response.status_code == 200
- assert response.json() == {"app": "unmounted"}
-
- response = await client.get("custom://www.example.com")
- assert response.status_code == 200
- assert response.json() == {"app": "mounted"}
-
-
@pytest.mark.anyio
async def test_async_mock_transport():
async def hello_world(request: httpx.Request) -> httpx.Response:
]
-def test_context_managed_transport_and_mount():
- class Transport(httpx.BaseTransport):
- def __init__(self, name: str) -> None:
- self.name: str = name
- self.events: list[str] = []
-
- def close(self):
- # The base implementation of httpx.BaseTransport just
- # calls into `.close`, so simple transport cases can just override
- # this method for any cleanup, where more complex cases
- # might want to additionally override `__enter__`/`__exit__`.
- self.events.append(f"{self.name}.close")
-
- def __enter__(self):
- super().__enter__()
- self.events.append(f"{self.name}.__enter__")
-
- def __exit__(self, *args):
- super().__exit__(*args)
- self.events.append(f"{self.name}.__exit__")
-
- transport = Transport(name="transport")
- mounted = Transport(name="mounted")
- with httpx.Client(transport=transport, mounts={"http://www.example.org": mounted}):
- pass
-
- assert transport.events == [
- "transport.__enter__",
- "transport.close",
- "transport.__exit__",
- ]
- assert mounted.events == [
- "mounted.__enter__",
- "mounted.close",
- "mounted.__exit__",
- ]
-
-
def hello_world(request):
return httpx.Response(200, text="Hello, world!")
]
-def unmounted(request: httpx.Request) -> httpx.Response:
- data = {"app": "unmounted"}
- return httpx.Response(200, json=data)
-
-
-def mounted(request: httpx.Request) -> httpx.Response:
- data = {"app": "mounted"}
- return httpx.Response(200, json=data)
-
-
-def test_mounted_transport():
- transport = httpx.MockTransport(unmounted)
- mounts = {"custom://": httpx.MockTransport(mounted)}
-
- client = httpx.Client(transport=transport, mounts=mounts)
-
- response = client.get("https://www.example.com")
- assert response.status_code == 200
- assert response.json() == {"app": "unmounted"}
-
- response = client.get("custom://www.example.com")
- assert response.status_code == 200
- assert response.json() == {"app": "mounted"}
-
-
-def test_all_mounted_transport():
- mounts = {"all://": httpx.MockTransport(mounted)}
-
- client = httpx.Client(mounts=mounts)
-
- response = client.get("https://www.example.com")
- assert response.status_code == 200
- assert response.json() == {"app": "mounted"}
-
-
def test_server_extensions(server):
url = server.url.copy_with(path="/http_version_2")
with httpx.Client(http2=True) as client:
-import httpcore
-import pytest
-
-import httpx
-
-
-def url_to_origin(url: str) -> httpcore.URL:
- """
- Given a URL string, return the origin in the raw tuple format that
- `httpcore` uses for it's representation.
- """
- u = httpx.URL(url)
- return httpcore.URL(scheme=u.raw_scheme, host=u.raw_host, port=u.port, target="/")
-
-
-def test_socks_proxy():
- url = httpx.URL("http://www.example.com")
-
- for proxy in ("socks5://localhost/", "socks5h://localhost/"):
- client = httpx.Client(proxy=proxy)
- transport = client._transport_for_url(url)
- assert isinstance(transport, httpx.HTTPTransport)
- assert isinstance(transport._pool, httpcore.SOCKSProxy)
-
- async_client = httpx.AsyncClient(proxy=proxy)
- async_transport = async_client._transport_for_url(url)
- assert isinstance(async_transport, httpx.AsyncHTTPTransport)
- assert isinstance(async_transport._pool, httpcore.AsyncSOCKSProxy)
-
-
-PROXY_URL = "http://[::1]"
-
-
-@pytest.mark.parametrize(
- ["url", "proxies", "expected"],
- [
- ("http://example.com", {}, None),
- ("http://example.com", {"https://": PROXY_URL}, None),
- ("http://example.com", {"http://example.net": PROXY_URL}, None),
- # Using "*" should match any domain name.
- ("http://example.com", {"http://*": PROXY_URL}, PROXY_URL),
- ("https://example.com", {"http://*": PROXY_URL}, None),
- # Using "example.com" should match example.com, but not www.example.com
- ("http://example.com", {"http://example.com": PROXY_URL}, PROXY_URL),
- ("http://www.example.com", {"http://example.com": PROXY_URL}, None),
- # Using "*.example.com" should match www.example.com, but not example.com
- ("http://example.com", {"http://*.example.com": PROXY_URL}, None),
- ("http://www.example.com", {"http://*.example.com": PROXY_URL}, PROXY_URL),
- # Using "*example.com" should match example.com and www.example.com
- ("http://example.com", {"http://*example.com": PROXY_URL}, PROXY_URL),
- ("http://www.example.com", {"http://*example.com": PROXY_URL}, PROXY_URL),
- ("http://wwwexample.com", {"http://*example.com": PROXY_URL}, None),
- # ...
- ("http://example.com:443", {"http://example.com": PROXY_URL}, PROXY_URL),
- ("http://example.com", {"all://": PROXY_URL}, PROXY_URL),
- ("http://example.com", {"http://": PROXY_URL}, PROXY_URL),
- ("http://example.com", {"all://example.com": PROXY_URL}, PROXY_URL),
- ("http://example.com", {"http://example.com": PROXY_URL}, PROXY_URL),
- ("http://example.com", {"http://example.com:80": PROXY_URL}, PROXY_URL),
- ("http://example.com:8080", {"http://example.com:8080": PROXY_URL}, PROXY_URL),
- ("http://example.com:8080", {"http://example.com": PROXY_URL}, PROXY_URL),
- (
- "http://example.com",
- {
- "all://": PROXY_URL + ":1",
- "http://": PROXY_URL + ":2",
- "all://example.com": PROXY_URL + ":3",
- "http://example.com": PROXY_URL + ":4",
- },
- PROXY_URL + ":4",
- ),
- (
- "http://example.com",
- {
- "all://": PROXY_URL + ":1",
- "http://": PROXY_URL + ":2",
- "all://example.com": PROXY_URL + ":3",
- },
- PROXY_URL + ":3",
- ),
- (
- "http://example.com",
- {"all://": PROXY_URL + ":1", "http://": PROXY_URL + ":2"},
- PROXY_URL + ":2",
- ),
- ],
-)
-def test_transport_for_request(url, proxies, expected):
- mounts = {key: httpx.HTTPTransport(proxy=value) for key, value in proxies.items()}
- client = httpx.Client(mounts=mounts)
-
- transport = client._transport_for_url(httpx.URL(url))
-
- if expected is None:
- assert transport is client._transport
- else:
- assert isinstance(transport, httpx.HTTPTransport)
- assert isinstance(transport._pool, httpcore.HTTPProxy)
- assert transport._pool._proxy_url == url_to_origin(expected)
-
-
-@pytest.mark.anyio
-@pytest.mark.network
-async def test_async_proxy_close():
- try:
- transport = httpx.AsyncHTTPTransport(proxy=PROXY_URL)
- client = httpx.AsyncClient(mounts={"https://": transport})
- await client.get("http://example.com")
- finally:
- await client.aclose()
-
-
-@pytest.mark.network
-def test_sync_proxy_close():
- try:
- transport = httpx.HTTPTransport(proxy=PROXY_URL)
- client = httpx.Client(mounts={"https://": transport})
- client.get("http://example.com")
- finally:
- client.close()
-
-
-def test_unsupported_proxy_scheme():
- with pytest.raises(ValueError):
- httpx.Client(proxy="ftp://127.0.0.1")
-
-
-@pytest.mark.parametrize(
- ["url", "env", "expected"],
- [
- ("http://google.com", {}, None),
- (
- "http://google.com",
- {"HTTP_PROXY": "http://example.com"},
- "http://example.com",
- ),
- # Auto prepend http scheme
- ("http://google.com", {"HTTP_PROXY": "example.com"}, "http://example.com"),
- (
- "http://google.com",
- {"HTTP_PROXY": "http://example.com", "NO_PROXY": "google.com"},
- None,
- ),
- # Everything proxied when NO_PROXY is empty/unset
- (
- "http://127.0.0.1",
- {"ALL_PROXY": "http://localhost:123", "NO_PROXY": ""},
- "http://localhost:123",
- ),
- # Not proxied if NO_PROXY matches URL.
- (
- "http://127.0.0.1",
- {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "127.0.0.1"},
- None,
- ),
- # Proxied if NO_PROXY scheme does not match URL.
- (
- "http://127.0.0.1",
- {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "https://127.0.0.1"},
- "http://localhost:123",
- ),
- # Proxied if NO_PROXY scheme does not match host.
- (
- "http://127.0.0.1",
- {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "1.1.1.1"},
- "http://localhost:123",
- ),
- # Not proxied if NO_PROXY matches host domain suffix.
- (
- "http://courses.mit.edu",
- {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu"},
- None,
- ),
- # Proxied even though NO_PROXY matches host domain *prefix*.
- (
- "https://mit.edu.info",
- {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu"},
- "http://localhost:123",
- ),
- # Not proxied if one item in NO_PROXY case matches host domain suffix.
- (
- "https://mit.edu.info",
- {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu,edu.info"},
- None,
- ),
- # Not proxied if one item in NO_PROXY case matches host domain suffix.
- # May include whitespace.
- (
- "https://mit.edu.info",
- {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu, edu.info"},
- None,
- ),
- # Proxied if no items in NO_PROXY match.
- (
- "https://mit.edu.info",
- {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "mit.edu,mit.info"},
- "http://localhost:123",
- ),
- # Proxied if NO_PROXY domain doesn't match.
- (
- "https://foo.example.com",
- {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "www.example.com"},
- "http://localhost:123",
- ),
- # Not proxied for subdomains matching NO_PROXY, with a leading ".".
- (
- "https://www.example1.com",
- {"ALL_PROXY": "http://localhost:123", "NO_PROXY": ".example1.com"},
- None,
- ),
- # Proxied, because NO_PROXY subdomains only match if "." separated.
- (
- "https://www.example2.com",
- {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "ample2.com"},
- "http://localhost:123",
- ),
- # No requests are proxied if NO_PROXY="*" is set.
- (
- "https://www.example3.com",
- {"ALL_PROXY": "http://localhost:123", "NO_PROXY": "*"},
- None,
- ),
- ],
-)
-@pytest.mark.parametrize("client_class", [httpx.Client, httpx.AsyncClient])
-def test_proxies_environ(monkeypatch, client_class, url, env, expected):
- for name, value in env.items():
- monkeypatch.setenv(name, value)
-
- client = client_class()
- transport = client._transport_for_url(httpx.URL(url))
-
- if expected is None:
- assert transport == client._transport
- else:
- assert transport._pool._proxy_url == url_to_origin(expected)
-
-
-@pytest.mark.parametrize(
- ["proxies", "is_valid"],
- [
- ({"http": "http://127.0.0.1"}, False),
- ({"https": "http://127.0.0.1"}, False),
- ({"all": "http://127.0.0.1"}, False),
- ({"http://": "http://127.0.0.1"}, True),
- ({"https://": "http://127.0.0.1"}, True),
- ({"all://": "http://127.0.0.1"}, True),
- ],
-)
-def test_for_deprecated_proxy_params(proxies, is_valid):
- mounts = {key: httpx.HTTPTransport(proxy=value) for key, value in proxies.items()}
-
- if not is_valid:
- with pytest.raises(ValueError):
- httpx.Client(mounts=mounts)
- else:
- httpx.Client(mounts=mounts)
-
-
-def test_proxy_with_mounts():
- proxy_transport = httpx.HTTPTransport(proxy="http://127.0.0.1")
- client = httpx.Client(mounts={"http://": proxy_transport})
-
- transport = client._transport_for_url(httpx.URL("http://example.com"))
- assert transport == proxy_transport
import json
import logging
-import os
-import random
import pytest
import httpx
-from httpx._utils import (
- URLPattern,
- get_environment_proxies,
-)
@pytest.mark.parametrize(
]
-@pytest.mark.parametrize(
- ["environment", "proxies"],
- [
- ({}, {}),
- ({"HTTP_PROXY": "http://127.0.0.1"}, {"http://": "http://127.0.0.1"}),
- (
- {"https_proxy": "http://127.0.0.1", "HTTP_PROXY": "https://127.0.0.1"},
- {"https://": "http://127.0.0.1", "http://": "https://127.0.0.1"},
- ),
- ({"all_proxy": "http://127.0.0.1"}, {"all://": "http://127.0.0.1"}),
- ({"TRAVIS_APT_PROXY": "http://127.0.0.1"}, {}),
- ({"no_proxy": "127.0.0.1"}, {"all://127.0.0.1": None}),
- ({"no_proxy": "192.168.0.0/16"}, {"all://192.168.0.0/16": None}),
- ({"no_proxy": "::1"}, {"all://[::1]": None}),
- ({"no_proxy": "localhost"}, {"all://localhost": None}),
- ({"no_proxy": "github.com"}, {"all://*github.com": None}),
- ({"no_proxy": ".github.com"}, {"all://*.github.com": None}),
- ({"no_proxy": "http://github.com"}, {"http://github.com": None}),
- ],
-)
-def test_get_environment_proxies(environment, proxies):
- os.environ.update(environment)
-
- assert get_environment_proxies() == proxies
-
-
@pytest.mark.parametrize(
"headers, output",
[
headers = client._redirect_headers(request, url, "GET")
assert "Authorization" not in headers
-
-
-@pytest.mark.parametrize(
- ["pattern", "url", "expected"],
- [
- ("http://example.com", "http://example.com", True),
- ("http://example.com", "https://example.com", False),
- ("http://example.com", "http://other.com", False),
- ("http://example.com:123", "http://example.com:123", True),
- ("http://example.com:123", "http://example.com:456", False),
- ("http://example.com:123", "http://example.com", False),
- ("all://example.com", "http://example.com", True),
- ("all://example.com", "https://example.com", True),
- ("http://", "http://example.com", True),
- ("http://", "https://example.com", False),
- ("all://", "https://example.com:123", True),
- ("", "https://example.com:123", True),
- ],
-)
-def test_url_matches(pattern, url, expected):
- pattern = URLPattern(pattern)
- assert pattern.matches(httpx.URL(url)) == expected
-
-
-def test_pattern_priority():
- matchers = [
- URLPattern("all://"),
- URLPattern("http://"),
- URLPattern("http://example.com"),
- URLPattern("http://example.com:123"),
- ]
- random.shuffle(matchers)
- assert sorted(matchers) == [
- URLPattern("http://example.com:123"),
- URLPattern("http://example.com"),
- URLPattern("http://"),
- URLPattern("all://"),
- ]