)
from ._utils import (
NetRCInfo,
+ enforce_http_url,
get_environment_proxies,
get_logger,
same_origin,
trust_env: bool = True,
):
if base_url is None:
- self.base_url = URL("", allow_relative=True)
+ self.base_url = URL("")
else:
self.base_url = URL(base_url)
"""
location = response.headers["Location"]
- url = URL(location, allow_relative=True)
+ url = URL(location)
# Check that we can handle the scheme
if url.scheme and url.scheme not in ("http", "https"):
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
+ enforce_http_url(url)
+
if self._proxies and not should_not_be_proxied(url):
is_default_port = (url.scheme == "http" and url.port == 80) or (
url.scheme == "https" and url.port == 443
"""
Sends a single request, without handling any redirections.
"""
-
transport = self._transport_for_url(request.url)
with map_exceptions(HTTPCORE_EXC_MAP, request=request):
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
+ enforce_http_url(url)
+
if self._proxies and not should_not_be_proxied(url):
is_default_port = (url.scheme == "http" and url.port == 80) or (
url.scheme == "https" and url.port == 443
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
- if request.url.scheme not in ("http", "https"):
- raise InvalidURL('URL scheme must be "http" or "https".')
-
timeout = self.timeout if isinstance(timeout, UnsetType) else Timeout(timeout)
auth = self._build_auth(request, auth)
"""
Sends a single request, without handling any redirections.
"""
-
transport = self._transport_for_url(request.url)
with map_exceptions(HTTPCORE_EXC_MAP, request=request):
from ._exceptions import (
CookieConflict,
HTTPStatusError,
- InvalidURL,
NotRedirectResponse,
RequestNotRead,
ResponseClosed,
class URL:
- def __init__(
- self,
- url: URLTypes,
- allow_relative: bool = False,
- params: QueryParamTypes = None,
- ) -> None:
+ def __init__(self, url: URLTypes, params: QueryParamTypes = None) -> None:
if isinstance(url, str):
self._uri_reference = rfc3986.api.iri_reference(url).encode()
else:
query_string = str(QueryParams(params))
self._uri_reference = self._uri_reference.copy_with(query=query_string)
- # Enforce absolute URLs by default.
- if not allow_relative:
- if not self.scheme:
- raise InvalidURL("No scheme included in URL.")
- if not self.host:
- raise InvalidURL("No host included in URL.")
-
@property
def scheme(self) -> str:
return self._uri_reference.scheme or ""
kwargs["authority"] = authority
- return URL(
- self._uri_reference.copy_with(**kwargs).unsplit(),
- allow_relative=self.is_relative_url,
- )
+ return URL(self._uri_reference.copy_with(**kwargs).unsplit(),)
def join(self, relative_url: URLTypes) -> "URL":
"""
# We drop any fragment portion, because RFC 3986 strictly
# treats URLs with a fragment portion as not being absolute URLs.
base_uri = self._uri_reference.copy_with(fragment=None)
- relative_url = URL(relative_url, allow_relative=True)
+ relative_url = URL(relative_url)
return URL(relative_url._uri_reference.resolve_with(base_uri).unsplit())
def __hash__(self) -> int:
from types import TracebackType
from urllib.request import getproxies
+from ._exceptions import InvalidURL
from ._types import PrimitiveData
if typing.TYPE_CHECKING: # pragma: no cover
return typing.cast(Logger, logger)
+def enforce_http_url(url: "URL") -> None:
+ """
+ Raise an appropriate InvalidURL for any non-HTTP URLs.
+ """
+ if not url.scheme:
+ raise InvalidURL("No scheme included in URL.")
+ if not url.host:
+ raise InvalidURL("No host included in URL.")
+ if url.scheme not in ("http", "https"):
+ raise InvalidURL('URL scheme must be "http" or "https".')
+
+
def same_origin(url: "URL", other: "URL") -> bool:
"""
Return 'True' if the given URLs share the same origin.
async with httpx.AsyncClient() as client:
with pytest.raises(httpx.InvalidURL):
await client.get("invalid://example.org")
+ with pytest.raises(httpx.InvalidURL):
+ await client.get("://example.org")
+ with pytest.raises(httpx.InvalidURL):
+ await client.get("http://")
@pytest.mark.usefixtures("async_environment")
assert response.elapsed > timedelta(0)
+def test_get_invalid_url(server):
+ with httpx.Client() as client:
+ with pytest.raises(httpx.InvalidURL):
+ client.get("invalid://example.org")
+ with pytest.raises(httpx.InvalidURL):
+ client.get("://example.org")
+ with pytest.raises(httpx.InvalidURL):
+ client.get("http://")
+
+
def test_build_request(server):
url = server.url.copy_with(path="/echo_headers")
headers = {"Custom-header": "value"}
assert request.url.scheme == "https"
assert request.url.port == 443
assert request.url.full_path == "/abc?foo=bar"
-
-
-def test_invalid_urls():
- with pytest.raises(httpx.InvalidURL):
- httpx.Request("GET", "example.org")
-
- with pytest.raises(httpx.InvalidURL):
- httpx.Request("GET", "http:///foo")
import pytest
-from httpx import URL, InvalidURL
+from httpx import URL
@pytest.mark.parametrize(
url = URL("http://example.com/b/c/d;p?q")
- with pytest.raises(InvalidURL):
- assert url.join("g:h") == "g:h"
-
assert url.join("g") == "http://example.com/b/c/g"
assert url.join("./g") == "http://example.com/b/c/g"
assert url.join("g/") == "http://example.com/b/c/g/"