]> git.ipfire.org Git - thirdparty/httpx.git/commitdiff
Drop `rfc3986` requirement. (#2252)
authorTom Christie <tom@tomchristie.com>
Tue, 10 Jan 2023 10:36:15 +0000 (10:36 +0000)
committerGitHub <noreply@github.com>
Tue, 10 Jan 2023 10:36:15 +0000 (10:36 +0000)
* Drop RawURL

* First pass at adding urlparse

* Update urlparse

* Add urlparse

* Add urlparse

* Unicode non-printables can be valid in IDNA hostnames

* Update _urlparse.py docstring

* Linting

* Trim away ununsed codepaths

* Tweaks for path validation depending on scheme and authority presence

* Minor cleanups

* Minor cleanups

* full_path -> raw_path, forr internal consistency

* Linting fixes

* Drop rfc3986 dependency

* Add test for #1833

* Linting

* Drop 'rfc3986' dependancy from README and docs homepage

Co-authored-by: Thomas Grainger <tagrain@gmail.com>
README.md
docs/index.md
httpx/_urlparse.py [new file with mode: 0644]
httpx/_urls.py
pyproject.toml
tests/models/test_url.py
tests/test_asgi.py
tests/test_urlparse.py [new file with mode: 0644]

index 520e85c360f4f59b2d085ea778745333741ab586..4d25491a6a0f80321e78124e7155f4502f461cd2 100644 (file)
--- a/README.md
+++ b/README.md
@@ -128,8 +128,7 @@ The HTTPX project relies on these excellent libraries:
 * `httpcore` - The underlying transport implementation for `httpx`.
   * `h11` - HTTP/1.1 support.
 * `certifi` - SSL certificates.
-* `rfc3986` - URL parsing & normalization.
-  * `idna` - Internationalized domain name support.
+* `idna` - Internationalized domain name support.
 * `sniffio` - Async library autodetection.
 
 As well as these optional installs:
index ec16ce7d1a31e195f440759aaf378422c5f26aae..cd25ee6ca50e2ff2bc28ce7868c4d91ea8a14e0b 100644 (file)
@@ -109,8 +109,7 @@ The HTTPX project relies on these excellent libraries:
 * `httpcore` - The underlying transport implementation for `httpx`.
   * `h11` - HTTP/1.1 support.
 * `certifi` - SSL certificates.
-* `rfc3986` - URL parsing & normalization.
-  * `idna` - Internationalized domain name support.
+* `idna` - Internationalized domain name support.
 * `sniffio` - Async library autodetection.
 
 As well as these optional installs:
diff --git a/httpx/_urlparse.py b/httpx/_urlparse.py
new file mode 100644 (file)
index 0000000..e16e812
--- /dev/null
@@ -0,0 +1,435 @@
+"""
+An implementation of `urlparse` that provides URL validation and normalization
+as described by RFC3986.
+
+We rely on this implementation rather than the one in Python's stdlib, because:
+
+* It provides more complete URL validation.
+* It properly differentiates between an empty querystring and an absent querystring,
+  to distinguish URLs with a trailing '?'.
+* It handles scheme, hostname, port, and path normalization.
+* It supports IDNA hostnames, normalizing them to their encoded form.
+* The API supports passing individual components, as well as the complete URL string.
+
+Previously we relied on the excellent `rfc3986` package to handle URL parsing and
+validation, but this module provides a simpler alternative, with less indirection
+required.
+"""
+import ipaddress
+import re
+import typing
+
+import idna
+
+from ._exceptions import InvalidURL
+
+MAX_URL_LENGTH = 65536
+
+# https://datatracker.ietf.org/doc/html/rfc3986.html#section-2.3
+UNRESERVED_CHARACTERS = (
+    "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~"
+)
+SUB_DELIMS = "!$&'()*+,;="
+
+PERCENT_ENCODED_REGEX = re.compile("%[A-Fa-f0-9]{2}")
+
+
+# {scheme}:      (optional)
+# //{authority}  (optional)
+# {path}
+# ?{query}       (optional)
+# #{fragment}    (optional)
+URL_REGEX = re.compile(
+    (
+        r"(?:(?P<scheme>{scheme}):)?"
+        r"(?://(?P<authority>{authority}))?"
+        r"(?P<path>{path})"
+        r"(?:\?(?P<query>{query}))?"
+        r"(?:#(?P<fragment>{fragment}))?"
+    ).format(
+        scheme="([a-zA-Z][a-zA-Z0-9+.-]*)?",
+        authority="[^/?#]*",
+        path="[^?#]*",
+        query="[^#]*",
+        fragment=".*",
+    )
+)
+
+# {userinfo}@    (optional)
+# {host}
+# :{port}        (optional)
+AUTHORITY_REGEX = re.compile(
+    (
+        r"(?:(?P<userinfo>{userinfo})@)?" r"(?P<host>{host})" r":?(?P<port>{port})?"
+    ).format(
+        userinfo="[^@]*",  # Any character sequence not including '@'.
+        host="(\\[.*\\]|[^:]*)",  # Either any character sequence not including ':',
+        # or an IPv6 address enclosed within square brackets.
+        port=".*",  # Any character sequence.
+    )
+)
+
+
+# If we call urlparse with an individual component, then we need to regex
+# validate that component individually.
+# Note that we're duplicating the same strings as above. Shock! Horror!!
+COMPONENT_REGEX = {
+    "scheme": re.compile("([a-zA-Z][a-zA-Z0-9+.-]*)?"),
+    "authority": re.compile("[^/?#]*"),
+    "path": re.compile("[^?#]*"),
+    "query": re.compile("[^#]*"),
+    "fragment": re.compile(".*"),
+    "userinfo": re.compile("[^@]*"),
+    "host": re.compile("(\\[.*\\]|[^:]*)"),
+    "port": re.compile(".*"),
+}
+
+
+# We use these simple regexs as a first pass before handing off to
+# the stdlib 'ipaddress' module for IP address validation.
+IPv4_STYLE_HOSTNAME = re.compile(r"^[0-9]+.[0-9]+.[0-9]+.[0-9]+$")
+IPv6_STYLE_HOSTNAME = re.compile(r"^\[.*\]$")
+
+
+class ParseResult(typing.NamedTuple):
+    scheme: str
+    userinfo: str
+    host: str
+    port: typing.Optional[int]
+    path: str
+    query: typing.Optional[str]
+    fragment: typing.Optional[str]
+
+    @property
+    def authority(self) -> str:
+        return "".join(
+            [
+                f"{self.userinfo}@" if self.userinfo else "",
+                f"[{self.host}]" if ":" in self.host else self.host,
+                f":{self.port}" if self.port is not None else "",
+            ]
+        )
+
+    @property
+    def netloc(self) -> str:
+        return "".join(
+            [
+                f"[{self.host}]" if ":" in self.host else self.host,
+                f":{self.port}" if self.port is not None else "",
+            ]
+        )
+
+    def copy_with(self, **kwargs: typing.Optional[str]) -> "ParseResult":
+        if not kwargs:
+            return self
+
+        defaults = {
+            "scheme": self.scheme,
+            "authority": self.authority,
+            "path": self.path,
+            "query": self.query,
+            "fragment": self.fragment,
+        }
+        defaults.update(kwargs)
+        return urlparse("", **defaults)
+
+    def __str__(self) -> str:
+        authority = self.authority
+        return "".join(
+            [
+                f"{self.scheme}:" if self.scheme else "",
+                f"//{authority}" if authority else "",
+                self.path,
+                f"?{self.query}" if self.query is not None else "",
+                f"#{self.fragment}" if self.fragment is not None else "",
+            ]
+        )
+
+
+def urlparse(url: str = "", **kwargs: typing.Optional[str]) -> ParseResult:
+    # Initial basic checks on allowable URLs.
+    # ---------------------------------------
+
+    # Hard limit the maximum allowable URL length.
+    if len(url) > MAX_URL_LENGTH:
+        raise InvalidURL("URL too long")
+
+    # If a URL includes any ASCII control characters including \t, \r, \n,
+    # then treat it as invalid.
+    if any(char.isascii() and not char.isprintable() for char in url):
+        raise InvalidURL("Invalid non-printable ASCII character in URL")
+
+    # Some keyword arguments require special handling.
+    # ------------------------------------------------
+
+    # Coerce "port" to a string, if it is provided as an integer.
+    if "port" in kwargs:
+        port = kwargs["port"]
+        kwargs["port"] = str(port) if isinstance(port, int) else port
+
+    # Replace "netloc" with "host and "port".
+    if "netloc" in kwargs:
+        netloc = kwargs.pop("netloc") or ""
+        kwargs["host"], _, kwargs["port"] = netloc.partition(":")
+
+    # Replace "username" and/or "password" with "userinfo".
+    if "username" in kwargs or "password" in kwargs:
+        username = quote(kwargs.pop("username", "") or "")
+        password = quote(kwargs.pop("password", "") or "")
+        kwargs["userinfo"] = f"{username}:{password}" if password else username
+
+    # Replace "raw_path" with "path" and "query".
+    if "raw_path" in kwargs:
+        raw_path = kwargs.pop("raw_path") or ""
+        kwargs["path"], seperator, kwargs["query"] = raw_path.partition("?")
+        if not seperator:
+            kwargs["query"] = None
+
+    # Ensure that IPv6 "host" addresses are always escaped with "[...]".
+    if "host" in kwargs:
+        host = kwargs.get("host") or ""
+        if ":" in host and not (host.startswith("[") and host.endswith("]")):
+            kwargs["host"] = f"[{host}]"
+
+    # If any keyword arguments are provided, ensure they are valid.
+    # -------------------------------------------------------------
+
+    for key, value in kwargs.items():
+        if key not in (
+            "scheme",
+            "authority",
+            "path",
+            "query",
+            "fragment",
+            "userinfo",
+            "host",
+            "port",
+        ):
+            raise TypeError(f"'{key}' is an invalid keyword argument for urlparse()")
+
+        if value is not None:
+            if len(value) > MAX_URL_LENGTH:
+                raise InvalidURL(f"URL component '{key}' too long")
+
+            # If a component includes any ASCII control characters including \t, \r, \n,
+            # then treat it as invalid.
+            if any(char.isascii() and not char.isprintable() for char in value):
+                raise InvalidURL(
+                    f"Invalid non-printable ASCII character in URL component '{key}'"
+                )
+
+            # Ensure that keyword arguments match as a valid regex.
+            if not COMPONENT_REGEX[key].fullmatch(value):
+                raise InvalidURL(f"Invalid URL component '{key}'")
+
+    # The URL_REGEX will always match, but may have empty components.
+    url_match = URL_REGEX.match(url)
+    assert url_match is not None
+    url_dict = url_match.groupdict()
+
+    # * 'scheme', 'authority', and 'path' may be empty strings.
+    # * 'query' may be 'None', indicating no trailing "?" portion.
+    #   Any string including the empty string, indicates a trailing "?".
+    # * 'fragment' may be 'None', indicating no trailing "#" portion.
+    #   Any string including the empty string, indicates a trailing "#".
+    scheme = kwargs.get("scheme", url_dict["scheme"]) or ""
+    authority = kwargs.get("authority", url_dict["authority"]) or ""
+    path = kwargs.get("path", url_dict["path"]) or ""
+    query = kwargs.get("query", url_dict["query"])
+    fragment = kwargs.get("fragment", url_dict["fragment"])
+
+    # The AUTHORITY_REGEX will always match, but may have empty components.
+    authority_match = AUTHORITY_REGEX.match(authority)
+    assert authority_match is not None
+    authority_dict = authority_match.groupdict()
+
+    # * 'userinfo' and 'host' may be empty strings.
+    # * 'port' may be 'None'.
+    userinfo = kwargs.get("userinfo", authority_dict["userinfo"]) or ""
+    host = kwargs.get("host", authority_dict["host"]) or ""
+    port = kwargs.get("port", authority_dict["port"])
+
+    # Normalize and validate each component.
+    # We end up with a parsed representation of the URL,
+    # with components that are plain ASCII bytestrings.
+    parsed_scheme: str = scheme.lower()
+    parsed_userinfo: str = quote(userinfo, safe=SUB_DELIMS + ":")
+    parsed_host: str = encode_host(host)
+    parsed_port: typing.Optional[int] = normalize_port(port, scheme)
+
+    has_scheme = parsed_scheme != ""
+    has_authority = (
+        parsed_userinfo != "" or parsed_host != "" or parsed_port is not None
+    )
+    validate_path(path, has_scheme=has_scheme, has_authority=has_authority)
+    if has_authority:
+        path = normalize_path(path)
+
+    parsed_path: str = quote(path, safe=SUB_DELIMS + ":@/")
+    parsed_query: typing.Optional[str] = (
+        None if query is None else quote(query, safe=SUB_DELIMS + "/?")
+    )
+    parsed_fragment: typing.Optional[str] = (
+        None if fragment is None else quote(fragment, safe=SUB_DELIMS + "/?")
+    )
+
+    # The parsed ASCII bytestrings are our canonical form.
+    # All properties of the URL are derived from these.
+    return ParseResult(
+        parsed_scheme,
+        parsed_userinfo,
+        parsed_host,
+        parsed_port,
+        parsed_path,
+        parsed_query,
+        parsed_fragment,
+    )
+
+
+def encode_host(host: str) -> str:
+    if not host:
+        return ""
+
+    elif IPv4_STYLE_HOSTNAME.match(host):
+        # Validate IPv4 hostnames like #.#.#.#
+        #
+        # From https://datatracker.ietf.org/doc/html/rfc3986/#section-3.2.2
+        #
+        # IPv4address = dec-octet "." dec-octet "." dec-octet "." dec-octet
+        try:
+            ipaddress.IPv4Address(host)
+        except ipaddress.AddressValueError:
+            raise InvalidURL("Invalid IPv4 address")
+        return host
+
+    elif IPv6_STYLE_HOSTNAME.match(host):
+        # Validate IPv6 hostnames like [...]
+        #
+        # From https://datatracker.ietf.org/doc/html/rfc3986/#section-3.2.2
+        #
+        # "A host identified by an Internet Protocol literal address, version 6
+        # [RFC3513] or later, is distinguished by enclosing the IP literal
+        # within square brackets ("[" and "]").  This is the only place where
+        # square bracket characters are allowed in the URI syntax."
+        try:
+            ipaddress.IPv6Address(host[1:-1])
+        except ipaddress.AddressValueError:
+            raise InvalidURL("Invalid IPv6 address")
+        return host[1:-1]
+
+    elif host.isascii():
+        # Regular ASCII hostnames
+        #
+        # From https://datatracker.ietf.org/doc/html/rfc3986/#section-3.2.2
+        #
+        # reg-name    = *( unreserved / pct-encoded / sub-delims )
+        return quote(host.lower(), safe=SUB_DELIMS)
+
+    # IDNA hostnames
+    try:
+        return idna.encode(host.lower()).decode("ascii")
+    except idna.IDNAError:
+        raise InvalidURL("Invalid IDNA hostname")
+
+
+def normalize_port(
+    port: typing.Optional[typing.Union[str, int]], scheme: str
+) -> typing.Optional[int]:
+    # From https://tools.ietf.org/html/rfc3986#section-3.2.3
+    #
+    # "A scheme may define a default port.  For example, the "http" scheme
+    # defines a default port of "80", corresponding to its reserved TCP
+    # port number.  The type of port designated by the port number (e.g.,
+    # TCP, UDP, SCTP) is defined by the URI scheme.  URI producers and
+    # normalizers should omit the port component and its ":" delimiter if
+    # port is empty or if its value would be the same as that of the
+    # scheme's default."
+    if port is None or port == "":
+        return None
+
+    try:
+        port_as_int = int(port)
+    except ValueError:
+        raise InvalidURL("Invalid port")
+
+    # See https://url.spec.whatwg.org/#url-miscellaneous
+    default_port = {"ftp": 21, "http": 80, "https": 443, "ws": 80, "wss": 443}.get(
+        scheme
+    )
+    if port_as_int == default_port:
+        return None
+    return port_as_int
+
+
+def validate_path(path: str, has_scheme: bool, has_authority: bool) -> None:
+    """
+    Path validation rules that depend on if the URL contains a scheme or authority component.
+
+    See https://datatracker.ietf.org/doc/html/rfc3986.html#section-3.3
+    """
+    if has_authority:
+        # > If a URI contains an authority component, then the path component
+        # > must either be empty or begin with a slash ("/") character."
+        if path and not path.startswith("/"):
+            raise InvalidURL("For absolute URLs, path must be empty or begin with '/'")
+    else:
+        # > If a URI does not contain an authority component, then the path cannot begin
+        # > with two slash characters ("//").
+        if path.startswith("//"):
+            raise InvalidURL(
+                "URLs with no authority component cannot have a path starting with '//'"
+            )
+        # > In addition, a URI reference (Section 4.1) may be a relative-path reference, in which
+        # > case the first path segment cannot contain a colon (":") character.
+        if path.startswith(":") and not has_scheme:
+            raise InvalidURL(
+                "URLs with no scheme component cannot have a path starting with ':'"
+            )
+
+
+def normalize_path(path: str) -> str:
+    """
+    Drop "." and ".." segments from a URL path.
+
+    For example:
+
+        normalize_path("/path/./to/somewhere/..") == "/path/to"
+    """
+    # https://datatracker.ietf.org/doc/html/rfc3986#section-5.2.4
+    components = path.split("/")
+    output: typing.List[str] = []
+    for component in components:
+        if component == ".":
+            pass
+        elif component == "..":
+            if output and output != [""]:
+                output.pop()
+        else:
+            output.append(component)
+    return "/".join(output)
+
+
+def percent_encode(char: str) -> str:
+    """
+    Replace every character in a string with the percent-encoded representation.
+
+    Characters outside the ASCII range are represented with their a percent-encoded
+    representation of their UTF-8 byte sequence.
+
+    For example:
+
+        percent_encode(" ") == "%20"
+    """
+    return "".join([f"%{byte:02x}" for byte in char.encode("utf-8")]).upper()
+
+
+def quote(string: str, safe: str = "/") -> str:
+    NON_ESCAPED_CHARS = UNRESERVED_CHARACTERS + safe
+    if string.count("%") == len(PERCENT_ENCODED_REGEX.findall(string)):
+        # If all occurances of '%' are valid '%xx' escapes, then treat
+        # percent as a non-escaping character.
+        NON_ESCAPED_CHARS += "%"
+
+    return "".join(
+        [char if char in NON_ESCAPED_CHARS else percent_encode(char) for char in string]
+    )
index f26b2eb2dc635be8192e870549a31e54373e5bee..1bcbc8b29af98af56cb7807e7da52ddef506a387 100644 (file)
@@ -1,12 +1,10 @@
 import typing
-from urllib.parse import parse_qs, quote, unquote, urlencode
+from urllib.parse import parse_qs, unquote, urlencode
 
 import idna
-import rfc3986
-import rfc3986.exceptions
 
-from ._exceptions import InvalidURL
 from ._types import PrimitiveData, QueryParamTypes, RawURL, URLTypes
+from ._urlparse import urlparse
 from ._utils import primitive_value_to_str
 
 
@@ -70,56 +68,63 @@ class URL:
       be properly URL escaped when decoding the parameter names and values themselves.
     """
 
-    _uri_reference: rfc3986.URIReference
-
     def __init__(
         self, url: typing.Union["URL", str] = "", **kwargs: typing.Any
     ) -> None:
+        if kwargs:
+            allowed = {
+                "scheme": str,
+                "username": str,
+                "password": str,
+                "userinfo": bytes,
+                "host": str,
+                "port": int,
+                "netloc": bytes,
+                "path": str,
+                "query": bytes,
+                "raw_path": bytes,
+                "fragment": str,
+                "params": object,
+            }
+
+            # Perform type checking for all supported keyword arguments.
+            for key, value in kwargs.items():
+                if key not in allowed:
+                    message = f"{key!r} is an invalid keyword argument for URL()"
+                    raise TypeError(message)
+                if value is not None and not isinstance(value, allowed[key]):
+                    expected = allowed[key].__name__
+                    seen = type(value).__name__
+                    message = f"Argument {key!r} must be {expected} but got {seen}"
+                    raise TypeError(message)
+                if isinstance(value, bytes):
+                    kwargs[key] = value.decode("ascii")
+
+            if "params" in kwargs:
+                # Replace any "params" keyword with the raw "query" instead.
+                #
+                # Ensure that empty params use `kwargs["query"] = None` rather
+                # than `kwargs["query"] = ""`, so that generated URLs do not
+                # include an empty trailing "?".
+                params = kwargs.pop("params")
+                kwargs["query"] = None if not params else str(QueryParams(params))
+
         if isinstance(url, str):
-            try:
-                self._uri_reference = rfc3986.iri_reference(url).encode()
-            except rfc3986.exceptions.InvalidAuthority as exc:
-                raise InvalidURL(message=str(exc)) from None
-
-            if self.is_absolute_url:
-                # We don't want to normalize relative URLs, since doing so
-                # removes any leading `../` portion.
-                self._uri_reference = self._uri_reference.normalize()
+            self._uri_reference = urlparse(url, **kwargs)
         elif isinstance(url, URL):
-            self._uri_reference = url._uri_reference
+            self._uri_reference = url._uri_reference.copy_with(**kwargs)
         else:
             raise TypeError(
                 f"Invalid type for url.  Expected str or httpx.URL, got {type(url)}: {url!r}"
             )
 
-        # Perform port normalization, following the WHATWG spec for default ports.
-        #
-        # See:
-        # * https://tools.ietf.org/html/rfc3986#section-3.2.3
-        # * https://url.spec.whatwg.org/#url-miscellaneous
-        # * https://url.spec.whatwg.org/#scheme-state
-        default_port = {
-            "ftp": ":21",
-            "http": ":80",
-            "https": ":443",
-            "ws": ":80",
-            "wss": ":443",
-        }.get(self._uri_reference.scheme, "")
-        authority = self._uri_reference.authority or ""
-        if default_port and authority.endswith(default_port):
-            authority = authority[: -len(default_port)]
-            self._uri_reference = self._uri_reference.copy_with(authority=authority)
-
-        if kwargs:
-            self._uri_reference = self.copy_with(**kwargs)._uri_reference
-
     @property
     def scheme(self) -> str:
         """
         The URL scheme, such as "http", "https".
         Always normalised to lowercase.
         """
-        return self._uri_reference.scheme or ""
+        return self._uri_reference.scheme
 
     @property
     def raw_scheme(self) -> bytes:
@@ -127,7 +132,7 @@ class URL:
         The raw bytes representation of the URL scheme, such as b"http", b"https".
         Always normalised to lowercase.
         """
-        return self.scheme.encode("ascii")
+        return self._uri_reference.scheme.encode("ascii")
 
     @property
     def userinfo(self) -> bytes:
@@ -135,8 +140,7 @@ class URL:
         The URL userinfo as a raw bytestring.
         For example: b"jo%40email.com:a%20secret".
         """
-        userinfo = self._uri_reference.userinfo or ""
-        return userinfo.encode("ascii")
+        return self._uri_reference.userinfo.encode("ascii")
 
     @property
     def username(self) -> str:
@@ -144,7 +148,7 @@ class URL:
         The URL username as a string, with URL decoding applied.
         For example: "jo@email.com"
         """
-        userinfo = self._uri_reference.userinfo or ""
+        userinfo = self._uri_reference.userinfo
         return unquote(userinfo.partition(":")[0])
 
     @property
@@ -153,7 +157,7 @@ class URL:
         The URL password as a string, with URL decoding applied.
         For example: "a secret"
         """
-        userinfo = self._uri_reference.userinfo or ""
+        userinfo = self._uri_reference.userinfo
         return unquote(userinfo.partition(":")[2])
 
     @property
@@ -176,11 +180,7 @@ class URL:
         url = httpx.URL("https://[::ffff:192.168.0.1]")
         assert url.host == "::ffff:192.168.0.1"
         """
-        host: str = self._uri_reference.host or ""
-
-        if host and ":" in host and host[0] == "[":
-            # it's an IPv6 address
-            host = host.lstrip("[").rstrip("]")
+        host: str = self._uri_reference.host
 
         if host.startswith("xn--"):
             host = idna.decode(host)
@@ -207,13 +207,7 @@ class URL:
         url = httpx.URL("https://[::ffff:192.168.0.1]")
         assert url.raw_host == b"::ffff:192.168.0.1"
         """
-        host: str = self._uri_reference.host or ""
-
-        if host and ":" in host and host[0] == "[":
-            # it's an IPv6 address
-            host = host.lstrip("[").rstrip("]")
-
-        return host.encode("ascii")
+        return self._uri_reference.host.encode("ascii")
 
     @property
     def port(self) -> typing.Optional[int]:
@@ -229,8 +223,7 @@ class URL:
         assert httpx.URL("http://www.example.com") == httpx.URL("http://www.example.com:80")
         assert httpx.URL("http://www.example.com:80").port is None
         """
-        port = self._uri_reference.port
-        return int(port) if port else None
+        return self._uri_reference.port
 
     @property
     def netloc(self) -> bytes:
@@ -241,12 +234,7 @@ class URL:
         This property may be used for generating the value of a request
         "Host" header.
         """
-        host = self._uri_reference.host or ""
-        port = self._uri_reference.port
-        netloc = host.encode("ascii")
-        if port:
-            netloc = netloc + b":" + port.encode("ascii")
-        return netloc
+        return self._uri_reference.netloc.encode("ascii")
 
     @property
     def path(self) -> str:
@@ -357,127 +345,7 @@ class URL:
         url = httpx.URL("https://www.example.com").copy_with(username="jo@gmail.com", password="a secret")
         assert url == "https://jo%40email.com:a%20secret@www.example.com"
         """
-        allowed = {
-            "scheme": str,
-            "username": str,
-            "password": str,
-            "userinfo": bytes,
-            "host": str,
-            "port": int,
-            "netloc": bytes,
-            "path": str,
-            "query": bytes,
-            "raw_path": bytes,
-            "fragment": str,
-            "params": object,
-        }
-
-        # Step 1
-        # ======
-        #
-        # Perform type checking for all supported keyword arguments.
-        for key, value in kwargs.items():
-            if key not in allowed:
-                message = f"{key!r} is an invalid keyword argument for copy_with()"
-                raise TypeError(message)
-            if value is not None and not isinstance(value, allowed[key]):
-                expected = allowed[key].__name__
-                seen = type(value).__name__
-                message = f"Argument {key!r} must be {expected} but got {seen}"
-                raise TypeError(message)
-
-        # Step 2
-        # ======
-        #
-        # Consolidate "username", "password", "userinfo", "host", "port" and "netloc"
-        # into a single "authority" keyword, for `rfc3986`.
-        if "username" in kwargs or "password" in kwargs:
-            # Consolidate "username" and "password" into "userinfo".
-            username = quote(kwargs.pop("username", self.username) or "")
-            password = quote(kwargs.pop("password", self.password) or "")
-            userinfo = f"{username}:{password}" if password else username
-            kwargs["userinfo"] = userinfo.encode("ascii")
-
-        if "host" in kwargs or "port" in kwargs:
-            # Consolidate "host" and "port" into "netloc".
-            host = kwargs.pop("host", self.host) or ""
-            port = kwargs.pop("port", self.port)
-
-            if host and ":" in host and host[0] != "[":
-                # IPv6 addresses need to be escaped within square brackets.
-                host = f"[{host}]"
-
-            kwargs["netloc"] = (
-                f"{host}:{port}".encode("ascii")
-                if port is not None
-                else host.encode("ascii")
-            )
-
-        if "userinfo" in kwargs or "netloc" in kwargs:
-            # Consolidate "userinfo" and "netloc" into authority.
-            userinfo = (kwargs.pop("userinfo", self.userinfo) or b"").decode("ascii")
-            netloc = (kwargs.pop("netloc", self.netloc) or b"").decode("ascii")
-            authority = f"{userinfo}@{netloc}" if userinfo else netloc
-            kwargs["authority"] = authority
-
-        # Step 3
-        # ======
-        #
-        # Wrangle any "path", "query", "raw_path" and "params" keywords into
-        # "query" and "path" keywords for `rfc3986`.
-        if "raw_path" in kwargs:
-            # If "raw_path" is included, then split it into "path" and "query" components.
-            raw_path = kwargs.pop("raw_path") or b""
-            path, has_query, query = raw_path.decode("ascii").partition("?")
-            kwargs["path"] = path
-            kwargs["query"] = query if has_query else None
-
-        else:
-            if kwargs.get("path") is not None:
-                # Ensure `kwargs["path"] = <url quoted str>` for `rfc3986`.
-                kwargs["path"] = quote(kwargs["path"])
-
-            if kwargs.get("query") is not None:
-                # Ensure `kwargs["query"] = <str>` for `rfc3986`.
-                #
-                # Note that `.copy_with(query=None)` and `.copy_with(query=b"")`
-                # are subtly different. The `None` style will not include an empty
-                # trailing "?" character.
-                kwargs["query"] = kwargs["query"].decode("ascii")
-
-            if "params" in kwargs:
-                # Replace any "params" keyword with the raw "query" instead.
-                #
-                # Ensure that empty params use `kwargs["query"] = None` rather
-                # than `kwargs["query"] = ""`, so that generated URLs do not
-                # include an empty trailing "?".
-                params = kwargs.pop("params")
-                kwargs["query"] = None if not params else str(QueryParams(params))
-
-        # Step 4
-        # ======
-        #
-        # Ensure any fragment component is quoted.
-        if kwargs.get("fragment") is not None:
-            kwargs["fragment"] = quote(kwargs["fragment"])
-
-        # Step 5
-        # ======
-        #
-        # At this point kwargs may include keys for "scheme", "authority", "path",
-        # "query" and "fragment". Together these constitute the entire URL.
-        #
-        # See https://tools.ietf.org/html/rfc3986#section-3
-        #
-        #  foo://example.com:8042/over/there?name=ferret#nose
-        #  \_/   \______________/\_________/ \_________/ \__/
-        #   |           |            |            |        |
-        # scheme     authority       path        query   fragment
-        new_url = URL(self)
-        new_url._uri_reference = self._uri_reference.copy_with(**kwargs)
-        if new_url.is_absolute_url:
-            new_url._uri_reference = new_url._uri_reference.normalize()
-        return URL(new_url)
+        return URL(self, **kwargs)
 
     def copy_set_param(self, key: str, value: typing.Any = None) -> "URL":
         return self.copy_with(params=self.params.set(key, value))
@@ -501,21 +369,9 @@ class URL:
         url = url.join("/new/path")
         assert url == "https://www.example.com/new/path"
         """
-        if self.is_relative_url:
-            # Workaround to handle relative URLs, which otherwise raise
-            # rfc3986.exceptions.ResolutionError when used as an argument
-            # in `.resolve_with`.
-            return (
-                self.copy_with(scheme="http", host="example.com")
-                .join(url)
-                .copy_with(scheme=None, host=None)
-            )
+        from urllib.parse import urljoin
 
-        # We drop any fragment portion, because RFC 3986 strictly
-        # treats URLs with a fragment portion as not being absolute URLs.
-        base_uri = self._uri_reference.copy_with(fragment=None)
-        relative_url = URL(url)
-        return URL(relative_url._uri_reference.resolve_with(base_uri).unsplit())
+        return URL(urljoin(str(self), str(URL(url))))
 
     def __hash__(self) -> int:
         return hash(str(self))
@@ -524,21 +380,33 @@ class URL:
         return isinstance(other, (URL, str)) and str(self) == str(URL(other))
 
     def __str__(self) -> str:
-        return typing.cast(str, self._uri_reference.unsplit())
+        return str(self._uri_reference)
 
     def __repr__(self) -> str:
-        class_name = self.__class__.__name__
-        url_str = str(self)
-        if self._uri_reference.userinfo:
-            # Mask any password component in the URL representation, to lower the
-            # risk of unintended leakage, such as in debug information and logging.
-            username = quote(self.username)
-            url_str = (
-                rfc3986.urlparse(url_str)
-                .copy_with(userinfo=f"{username}:[secure]")
-                .unsplit()
-            )
-        return f"{class_name}({url_str!r})"
+        scheme, userinfo, host, port, path, query, fragment = self._uri_reference
+
+        if ":" in userinfo:
+            # Mask any password component.
+            userinfo = f'{userinfo.split(":")[0]}:[secure]'
+
+        authority = "".join(
+            [
+                f"{userinfo}@" if userinfo else "",
+                f"[{host}]" if ":" in host else host,
+                f":{port}" if port is not None else "",
+            ]
+        )
+        url = "".join(
+            [
+                f"{self.scheme}:" if scheme else "",
+                f"//{authority}" if authority else "",
+                path,
+                f"?{query}" if query is not None else "",
+                f"#{fragment}" if fragment is not None else "",
+            ]
+        )
+
+        return f"{self.__class__.__name__}({url!r})"
 
 
 class QueryParams(typing.Mapping[str, str]):
index 31677293180c0b5a7edc48cce215328e65d1eb45..b11c02825ba479682dea668b03fc40b130f650fd 100644 (file)
@@ -30,7 +30,7 @@ classifiers = [
 dependencies = [
     "certifi",
     "httpcore>=0.15.0,<0.17.0",
-    "rfc3986[idna2008]>=1.3,<2",
+    "idna",
     "sniffio",
 ]
 dynamic = ["readme", "version"]
index 959681be9f53bc4a2bda8c470877779858431dce..cbd8d6426930a5c7111391711be67f2e7efbab46 100644 (file)
@@ -312,49 +312,13 @@ def test_url_copywith_security():
     """
     Prevent unexpected changes on URL after calling copy_with (CVE-2021-41945)
     """
-    url = httpx.URL("https://u:p@[invalid!]//evilHost/path?t=w#tw")
-    original_scheme = url.scheme
-    original_userinfo = url.userinfo
-    original_netloc = url.netloc
-    original_raw_path = url.raw_path
-    original_query = url.query
-    original_fragment = url.fragment
-    url = url.copy_with()
-    assert url.scheme == original_scheme
-    assert url.userinfo == original_userinfo
-    assert url.netloc == original_netloc
-    assert url.raw_path == original_raw_path
-    assert url.query == original_query
-    assert url.fragment == original_fragment
-
-    url = httpx.URL("https://u:p@[invalid!]//evilHost/path?t=w#tw")
-    original_scheme = url.scheme
-    original_netloc = url.netloc
-    original_raw_path = url.raw_path
-    original_query = url.query
-    original_fragment = url.fragment
-    url = url.copy_with(userinfo=b"")
-    assert url.scheme == original_scheme
-    assert url.userinfo == b""
-    assert url.netloc == original_netloc
-    assert url.raw_path == original_raw_path
-    assert url.query == original_query
-    assert url.fragment == original_fragment
+    with pytest.raises(httpx.InvalidURL):
+        httpx.URL("https://u:p@[invalid!]//evilHost/path?t=w#tw")
 
     url = httpx.URL("https://example.com/path?t=w#tw")
-    original_userinfo = url.userinfo
-    original_netloc = url.netloc
-    original_raw_path = url.raw_path
-    original_query = url.query
-    original_fragment = url.fragment
     bad = "https://xxxx:xxxx@xxxxxxx/xxxxx/xxx?x=x#xxxxx"
-    url = url.copy_with(scheme=bad)
-    assert url.scheme == bad
-    assert url.userinfo == original_userinfo
-    assert url.netloc == original_netloc
-    assert url.raw_path == original_raw_path
-    assert url.query == original_query
-    assert url.fragment == original_fragment
+    with pytest.raises(httpx.InvalidURL):
+        url.copy_with(scheme=bad)
 
 
 def test_url_invalid():
@@ -426,6 +390,14 @@ def test_ipv6_url_from_raw_url(host):
     assert str(url) == "https://[::ffff:192.168.0.1]/"
 
 
+def test_resolution_error_1833():
+    """
+    See https://github.com/encode/httpx/issues/1833
+    """
+    url = httpx.URL("https://example.com/?[]")
+    assert url.join("/") == "https://example.com/"
+
+
 def test_url_raw_compatibility():
     url = httpx.URL("https://www.example.com/path")
     scheme, host, port, raw_path = url.raw
index 9c877deaf9331750214e7b06ceb3731df2bcfd9c..17c8d5eb278faea6a9e8d3220104723df12ef322 100644 (file)
@@ -116,7 +116,7 @@ async def test_asgi_raw_path():
         response = await client.get(url)
 
     assert response.status_code == 200
-    assert response.json() == {"raw_path": "/user%40example.org"}
+    assert response.json() == {"raw_path": "/user@example.org"}
 
 
 @pytest.mark.anyio
diff --git a/tests/test_urlparse.py b/tests/test_urlparse.py
new file mode 100644 (file)
index 0000000..e48ffa6
--- /dev/null
@@ -0,0 +1,232 @@
+import pytest
+
+import httpx
+from httpx._urlparse import urlparse
+
+
+def test_urlparse():
+    url = urlparse("https://www.example.com/")
+
+    assert url.scheme == "https"
+    assert url.userinfo == ""
+    assert url.netloc == "www.example.com"
+    assert url.host == "www.example.com"
+    assert url.port is None
+    assert url.path == "/"
+    assert url.query is None
+    assert url.fragment is None
+
+    assert str(url) == "https://www.example.com/"
+
+
+def test_urlparse_no_scheme():
+    url = urlparse("://example.com")
+    assert url.scheme == ""
+    assert url.host == "example.com"
+    assert url.path == ""
+
+
+def test_urlparse_no_authority():
+    url = urlparse("http://")
+    assert url.scheme == "http"
+    assert url.host == ""
+    assert url.path == ""
+
+
+# Tests for different host types
+
+
+def test_urlparse_valid_host():
+    url = urlparse("https://example.com/")
+    assert url.host == "example.com"
+
+
+def test_urlparse_normalized_host():
+    url = urlparse("https://EXAMPLE.com/")
+    assert url.host == "example.com"
+
+
+def test_urlparse_valid_ipv4():
+    url = urlparse("https://1.2.3.4/")
+    assert url.host == "1.2.3.4"
+
+
+def test_urlparse_invalid_ipv4():
+    with pytest.raises(httpx.InvalidURL) as exc:
+        urlparse("https://999.999.999.999/")
+    assert str(exc.value) == "Invalid IPv4 address"
+
+
+def test_urlparse_valid_ipv6():
+    url = urlparse("https://[2001:db8::ff00:42:8329]/")
+    assert url.host == "2001:db8::ff00:42:8329"
+
+
+def test_urlparse_invalid_ipv6():
+    with pytest.raises(httpx.InvalidURL) as exc:
+        urlparse("https://[2001]/")
+    assert str(exc.value) == "Invalid IPv6 address"
+
+
+def test_urlparse_unescaped_idna_host():
+    url = urlparse("https://δΈ­ε›½.icom.museum/")
+    assert url.host == "xn--fiqs8s.icom.museum"
+
+
+def test_urlparse_escaped_idna_host():
+    url = urlparse("https://xn--fiqs8s.icom.museum/")
+    assert url.host == "xn--fiqs8s.icom.museum"
+
+
+def test_urlparse_invalid_idna_host():
+    with pytest.raises(httpx.InvalidURL) as exc:
+        urlparse("https://β˜ƒ.com/")
+    assert str(exc.value) == "Invalid IDNA hostname"
+
+
+# Tests for different port types
+
+
+def test_urlparse_valid_port():
+    url = urlparse("https://example.com:123/")
+    assert url.port == 123
+
+
+def test_urlparse_normalized_port():
+    # If the port matches the scheme default it is normalized to None.
+    url = urlparse("https://example.com:443/")
+    assert url.port is None
+
+
+def test_urlparse_invalid_port():
+    with pytest.raises(httpx.InvalidURL) as exc:
+        urlparse("https://example.com:abc/")
+    assert str(exc.value) == "Invalid port"
+
+
+# Tests for path handling
+
+
+def test_urlparse_normalized_path():
+    url = urlparse("https://example.com/abc/def/../ghi/./jkl")
+    assert url.path == "/abc/ghi/jkl"
+
+
+def test_urlparse_escaped_path():
+    url = urlparse("https://example.com/ /🌟/")
+    assert url.path == "/%20/%F0%9F%8C%9F/"
+
+
+def test_urlparse_leading_dot_prefix_on_absolute_url():
+    url = urlparse("https://example.com/../abc")
+    assert url.path == "/abc"
+
+
+def test_urlparse_leading_dot_prefix_on_relative_url():
+    url = urlparse("../abc")
+    assert url.path == "../abc"
+
+
+# Tests for invalid URLs
+
+
+def test_urlparse_excessively_long_url():
+    with pytest.raises(httpx.InvalidURL) as exc:
+        urlparse("https://www.example.com/" + "x" * 100_000)
+    assert str(exc.value) == "URL too long"
+
+
+def test_urlparse_excessively_long_component():
+    with pytest.raises(httpx.InvalidURL) as exc:
+        urlparse("https://www.example.com", path="/" + "x" * 100_000)
+    assert str(exc.value) == "URL component 'path' too long"
+
+
+def test_urlparse_non_printing_character_in_url():
+    with pytest.raises(httpx.InvalidURL) as exc:
+        urlparse("https://www.example.com/\n")
+    assert str(exc.value) == "Invalid non-printable ASCII character in URL"
+
+
+def test_urlparse_non_printing_character_in_component():
+    with pytest.raises(httpx.InvalidURL) as exc:
+        urlparse("https://www.example.com", path="/\n")
+    assert (
+        str(exc.value)
+        == "Invalid non-printable ASCII character in URL component 'path'"
+    )
+
+
+# Test for urlparse components
+
+
+def test_urlparse_with_components():
+    url = urlparse(scheme="https", host="www.example.com", path="/")
+
+    assert url.scheme == "https"
+    assert url.userinfo == ""
+    assert url.host == "www.example.com"
+    assert url.port is None
+    assert url.path == "/"
+    assert url.query is None
+    assert url.fragment is None
+
+    assert str(url) == "https://www.example.com/"
+
+
+def test_urlparse_with_invalid_component():
+    with pytest.raises(TypeError) as exc:
+        urlparse(scheme="https", host="www.example.com", incorrect="/")
+    assert str(exc.value) == "'incorrect' is an invalid keyword argument for urlparse()"
+
+
+def test_urlparse_with_invalid_scheme():
+    with pytest.raises(httpx.InvalidURL) as exc:
+        urlparse(scheme="~", host="www.example.com", path="/")
+    assert str(exc.value) == "Invalid URL component 'scheme'"
+
+
+def test_urlparse_with_invalid_path():
+    with pytest.raises(httpx.InvalidURL) as exc:
+        urlparse(scheme="https", host="www.example.com", path="abc")
+    assert str(exc.value) == "For absolute URLs, path must be empty or begin with '/'"
+
+    with pytest.raises(httpx.InvalidURL) as exc:
+        urlparse(path="//abc")
+    assert (
+        str(exc.value)
+        == "URLs with no authority component cannot have a path starting with '//'"
+    )
+
+    with pytest.raises(httpx.InvalidURL) as exc:
+        urlparse(path=":abc")
+    assert (
+        str(exc.value)
+        == "URLs with no scheme component cannot have a path starting with ':'"
+    )
+
+
+def test_urlparse_with_relative_path():
+    # This path would be invalid for an absolute URL, but is valid as a relative URL.
+    url = urlparse(path="abc")
+    assert url.path == "abc"
+
+
+# Tests for accessing and modifying `urlparse` results.
+
+
+def test_copy_with():
+    url = urlparse("https://www.example.com/")
+    assert str(url) == "https://www.example.com/"
+
+    url = url.copy_with()
+    assert str(url) == "https://www.example.com/"
+
+    url = url.copy_with(scheme="http")
+    assert str(url) == "http://www.example.com/"
+
+    url = url.copy_with(netloc="example.com")
+    assert str(url) == "http://example.com/"
+
+    url = url.copy_with(path="/abc")
+    assert str(url) == "http://example.com/abc"