]> git.ipfire.org Git - thirdparty/httpx.git/commitdiff
Move remaining utility functions from _utils.py to _models.py (#3387)
authorRafaelWO <38643099+RafaelWO@users.noreply.github.com>
Fri, 1 Nov 2024 19:20:18 +0000 (20:20 +0100)
committerGitHub <noreply@github.com>
Fri, 1 Nov 2024 19:20:18 +0000 (19:20 +0000)
httpx/_models.py
httpx/_utils.py
scripts/lint
tests/models/test_headers.py
tests/test_utils.py

index 16c4d1cf56207afc14f9553f1e2b3b75f4d54ef8..e7c992f77e6bbdf0ab46fa4da758efc2224e2438 100644 (file)
@@ -1,8 +1,10 @@
 from __future__ import annotations
 
+import codecs
 import datetime
 import email.message
 import json as jsonlib
+import re
 import typing
 import urllib.request
 from collections.abc import Mapping
@@ -44,15 +46,23 @@ from ._types import (
     SyncByteStream,
 )
 from ._urls import URL
-from ._utils import (
-    is_known_encoding,
-    obfuscate_sensitive_headers,
-    parse_content_type_charset,
-    parse_header_links,
-)
+from ._utils import to_bytes_or_str, to_str
 
 __all__ = ["Cookies", "Headers", "Request", "Response"]
 
+SENSITIVE_HEADERS = {"authorization", "proxy-authorization"}
+
+
+def _is_known_encoding(encoding: str) -> bool:
+    """
+    Return `True` if `encoding` is a known codec.
+    """
+    try:
+        codecs.lookup(encoding)
+    except LookupError:
+        return False
+    return True
+
 
 def _normalize_header_key(key: str | bytes, encoding: str | None = None) -> bytes:
     """
@@ -72,6 +82,60 @@ def _normalize_header_value(value: str | bytes, encoding: str | None = None) ->
     return value.encode(encoding or "ascii")
 
 
+def _parse_content_type_charset(content_type: str) -> str | None:
+    # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery.
+    # See: https://peps.python.org/pep-0594/#cgi
+    msg = email.message.Message()
+    msg["content-type"] = content_type
+    return msg.get_content_charset(failobj=None)
+
+
+def _parse_header_links(value: str) -> list[dict[str, str]]:
+    """
+    Returns a list of parsed link headers, for more info see:
+    https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
+    The generic syntax of those is:
+    Link: < uri-reference >; param1=value1; param2="value2"
+    So for instance:
+    Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;'
+    would return
+        [
+            {"url": "http:/.../front.jpeg", "type": "image/jpeg"},
+            {"url": "http://.../back.jpeg"},
+        ]
+    :param value: HTTP Link entity-header field
+    :return: list of parsed link headers
+    """
+    links: list[dict[str, str]] = []
+    replace_chars = " '\""
+    value = value.strip(replace_chars)
+    if not value:
+        return links
+    for val in re.split(", *<", value):
+        try:
+            url, params = val.split(";", 1)
+        except ValueError:
+            url, params = val, ""
+        link = {"url": url.strip("<> '\"")}
+        for param in params.split(";"):
+            try:
+                key, value = param.split("=")
+            except ValueError:
+                break
+            link[key.strip(replace_chars)] = value.strip(replace_chars)
+        links.append(link)
+    return links
+
+
+def _obfuscate_sensitive_headers(
+    items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]],
+) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]:
+    for k, v in items:
+        if to_str(k.lower()) in SENSITIVE_HEADERS:
+            v = to_bytes_or_str("[secure]", match_type_of=v)
+        yield k, v
+
+
 class Headers(typing.MutableMapping[str, str]):
     """
     HTTP headers, as a case-insensitive multi-dict.
@@ -306,7 +370,7 @@ class Headers(typing.MutableMapping[str, str]):
         if self.encoding != "ascii":
             encoding_str = f", encoding={self.encoding!r}"
 
-        as_list = list(obfuscate_sensitive_headers(self.multi_items()))
+        as_list = list(_obfuscate_sensitive_headers(self.multi_items()))
         as_dict = dict(as_list)
 
         no_duplicate_keys = len(as_dict) == len(as_list)
@@ -599,7 +663,7 @@ class Response:
         """
         if not hasattr(self, "_encoding"):
             encoding = self.charset_encoding
-            if encoding is None or not is_known_encoding(encoding):
+            if encoding is None or not _is_known_encoding(encoding):
                 if isinstance(self.default_encoding, str):
                     encoding = self.default_encoding
                 elif hasattr(self, "_content"):
@@ -630,7 +694,7 @@ class Response:
         if content_type is None:
             return None
 
-        return parse_content_type_charset(content_type)
+        return _parse_content_type_charset(content_type)
 
     def _get_content_decoder(self) -> ContentDecoder:
         """
@@ -785,7 +849,7 @@ class Response:
 
         return {
             (link.get("rel") or link.get("url")): link
-            for link in parse_header_links(header)
+            for link in _parse_header_links(header)
         }
 
     @property
index c873bdb2f0fca48b8ad4e210486462d4af35acc6..9a1ed547498f9c4cd8131d1ef603ea605358bda8 100644 (file)
@@ -1,7 +1,5 @@
 from __future__ import annotations
 
-import codecs
-import email.message
 import ipaddress
 import os
 import re
@@ -29,74 +27,6 @@ def primitive_value_to_str(value: PrimitiveData) -> str:
     return str(value)
 
 
-def is_known_encoding(encoding: str) -> bool:
-    """
-    Return `True` if `encoding` is a known codec.
-    """
-    try:
-        codecs.lookup(encoding)
-    except LookupError:
-        return False
-    return True
-
-
-def parse_header_links(value: str) -> list[dict[str, str]]:
-    """
-    Returns a list of parsed link headers, for more info see:
-    https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
-    The generic syntax of those is:
-    Link: < uri-reference >; param1=value1; param2="value2"
-    So for instance:
-    Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;'
-    would return
-        [
-            {"url": "http:/.../front.jpeg", "type": "image/jpeg"},
-            {"url": "http://.../back.jpeg"},
-        ]
-    :param value: HTTP Link entity-header field
-    :return: list of parsed link headers
-    """
-    links: list[dict[str, str]] = []
-    replace_chars = " '\""
-    value = value.strip(replace_chars)
-    if not value:
-        return links
-    for val in re.split(", *<", value):
-        try:
-            url, params = val.split(";", 1)
-        except ValueError:
-            url, params = val, ""
-        link = {"url": url.strip("<> '\"")}
-        for param in params.split(";"):
-            try:
-                key, value = param.split("=")
-            except ValueError:
-                break
-            link[key.strip(replace_chars)] = value.strip(replace_chars)
-        links.append(link)
-    return links
-
-
-def parse_content_type_charset(content_type: str) -> str | None:
-    # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery.
-    # See: https://peps.python.org/pep-0594/#cgi
-    msg = email.message.Message()
-    msg["content-type"] = content_type
-    return msg.get_content_charset(failobj=None)
-
-
-SENSITIVE_HEADERS = {"authorization", "proxy-authorization"}
-
-
-def obfuscate_sensitive_headers(
-    items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]],
-) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]:
-    for k, v in items:
-        if to_str(k.lower()) in SENSITIVE_HEADERS:
-            v = to_bytes_or_str("[secure]", match_type_of=v)
-        yield k, v
-
-
 def port_or_default(url: URL) -> int | None:
     if url.port is not None:
         return url.port
index 3d8685a0652f613afc1dbc9457b6e530dabec172..6d096d760be626da1e675eb105e4b8a586998533 100755 (executable)
@@ -8,5 +8,5 @@ export SOURCE_FILES="httpx tests"
 
 set -x
 
-${PREFIX}ruff --fix $SOURCE_FILES
+${PREFIX}ruff check --fix $SOURCE_FILES
 ${PREFIX}ruff format $SOURCE_FILES
index d671dc41869cec7c8b5b9edc98377b4af7dff866..a87a446784b003b0e62b87408b9d057773c1b2fe 100644 (file)
@@ -174,3 +174,46 @@ def test_sensitive_headers(header):
     value = "s3kr3t"
     h = httpx.Headers({header: value})
     assert repr(h) == "Headers({'%s': '[secure]'})" % header
+
+
+@pytest.mark.parametrize(
+    "headers, output",
+    [
+        ([("content-type", "text/html")], [("content-type", "text/html")]),
+        ([("authorization", "s3kr3t")], [("authorization", "[secure]")]),
+        ([("proxy-authorization", "s3kr3t")], [("proxy-authorization", "[secure]")]),
+    ],
+)
+def test_obfuscate_sensitive_headers(headers, output):
+    as_dict = {k: v for k, v in output}
+    headers_class = httpx.Headers({k: v for k, v in headers})
+    assert repr(headers_class) == f"Headers({as_dict!r})"
+
+
+@pytest.mark.parametrize(
+    "value, expected",
+    (
+        (
+            '<http:/.../front.jpeg>; rel=front; type="image/jpeg"',
+            [{"url": "http:/.../front.jpeg", "rel": "front", "type": "image/jpeg"}],
+        ),
+        ("<http:/.../front.jpeg>", [{"url": "http:/.../front.jpeg"}]),
+        ("<http:/.../front.jpeg>;", [{"url": "http:/.../front.jpeg"}]),
+        (
+            '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;',
+            [
+                {"url": "http:/.../front.jpeg", "type": "image/jpeg"},
+                {"url": "http://.../back.jpeg"},
+            ],
+        ),
+        ("", []),
+    ),
+)
+def test_parse_header_links(value, expected):
+    all_links = httpx.Response(200, headers={"link": value}).links.values()
+    assert all(link in all_links for link in expected)
+
+
+def test_parse_header_links_no_link():
+    all_links = httpx.Response(200).links
+    assert all_links == {}
index f7e6c1642acea1f756fd4c087111965e79f1befd..3e2abdef28eddadde9e7bc6cd6f3fe555f47dbad 100644 (file)
@@ -53,35 +53,6 @@ def test_guess_by_bom(encoding, expected):
     assert response.json() == {"abc": 123}
 
 
-@pytest.mark.parametrize(
-    "value, expected",
-    (
-        (
-            '<http:/.../front.jpeg>; rel=front; type="image/jpeg"',
-            [{"url": "http:/.../front.jpeg", "rel": "front", "type": "image/jpeg"}],
-        ),
-        ("<http:/.../front.jpeg>", [{"url": "http:/.../front.jpeg"}]),
-        ("<http:/.../front.jpeg>;", [{"url": "http:/.../front.jpeg"}]),
-        (
-            '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;',
-            [
-                {"url": "http:/.../front.jpeg", "type": "image/jpeg"},
-                {"url": "http://.../back.jpeg"},
-            ],
-        ),
-        ("", []),
-    ),
-)
-def test_parse_header_links(value, expected):
-    all_links = httpx.Response(200, headers={"link": value}).links.values()
-    assert all(link in all_links for link in expected)
-
-
-def test_parse_header_links_no_link():
-    all_links = httpx.Response(200).links
-    assert all_links == {}
-
-
 def test_logging_request(server, caplog):
     caplog.set_level(logging.INFO)
     with httpx.Client() as client:
@@ -144,20 +115,6 @@ def test_get_environment_proxies(environment, proxies):
     assert get_environment_proxies() == proxies
 
 
-@pytest.mark.parametrize(
-    "headers, output",
-    [
-        ([("content-type", "text/html")], [("content-type", "text/html")]),
-        ([("authorization", "s3kr3t")], [("authorization", "[secure]")]),
-        ([("proxy-authorization", "s3kr3t")], [("proxy-authorization", "[secure]")]),
-    ],
-)
-def test_obfuscate_sensitive_headers(headers, output):
-    as_dict = {k: v for k, v in output}
-    headers_class = httpx.Headers({k: v for k, v in headers})
-    assert repr(headers_class) == f"Headers({as_dict!r})"
-
-
 def test_same_origin():
     origin = httpx.URL("https://example.com")
     request = httpx.Request("GET", "HTTPS://EXAMPLE.COM:443")