from __future__ import annotations
+import codecs
import datetime
import email.message
import json as jsonlib
+import re
import typing
import urllib.request
from collections.abc import Mapping
SyncByteStream,
)
from ._urls import URL
-from ._utils import (
- is_known_encoding,
- obfuscate_sensitive_headers,
- parse_content_type_charset,
- parse_header_links,
-)
+from ._utils import to_bytes_or_str, to_str
__all__ = ["Cookies", "Headers", "Request", "Response"]
+SENSITIVE_HEADERS = {"authorization", "proxy-authorization"}
+
+
+def _is_known_encoding(encoding: str) -> bool:
+ """
+ Return `True` if `encoding` is a known codec.
+ """
+ try:
+ codecs.lookup(encoding)
+ except LookupError:
+ return False
+ return True
+
def _normalize_header_key(key: str | bytes, encoding: str | None = None) -> bytes:
"""
return value.encode(encoding or "ascii")
+def _parse_content_type_charset(content_type: str) -> str | None:
+ # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery.
+ # See: https://peps.python.org/pep-0594/#cgi
+ msg = email.message.Message()
+ msg["content-type"] = content_type
+ return msg.get_content_charset(failobj=None)
+
+
+def _parse_header_links(value: str) -> list[dict[str, str]]:
+ """
+ Returns a list of parsed link headers, for more info see:
+ https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
+ The generic syntax of those is:
+ Link: < uri-reference >; param1=value1; param2="value2"
+ So for instance:
+ Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;'
+ would return
+ [
+ {"url": "http:/.../front.jpeg", "type": "image/jpeg"},
+ {"url": "http://.../back.jpeg"},
+ ]
+ :param value: HTTP Link entity-header field
+ :return: list of parsed link headers
+ """
+ links: list[dict[str, str]] = []
+ replace_chars = " '\""
+ value = value.strip(replace_chars)
+ if not value:
+ return links
+ for val in re.split(", *<", value):
+ try:
+ url, params = val.split(";", 1)
+ except ValueError:
+ url, params = val, ""
+ link = {"url": url.strip("<> '\"")}
+ for param in params.split(";"):
+ try:
+ key, value = param.split("=")
+ except ValueError:
+ break
+ link[key.strip(replace_chars)] = value.strip(replace_chars)
+ links.append(link)
+ return links
+
+
+def _obfuscate_sensitive_headers(
+ items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]],
+) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]:
+ for k, v in items:
+ if to_str(k.lower()) in SENSITIVE_HEADERS:
+ v = to_bytes_or_str("[secure]", match_type_of=v)
+ yield k, v
+
+
class Headers(typing.MutableMapping[str, str]):
"""
HTTP headers, as a case-insensitive multi-dict.
if self.encoding != "ascii":
encoding_str = f", encoding={self.encoding!r}"
- as_list = list(obfuscate_sensitive_headers(self.multi_items()))
+ as_list = list(_obfuscate_sensitive_headers(self.multi_items()))
as_dict = dict(as_list)
no_duplicate_keys = len(as_dict) == len(as_list)
"""
if not hasattr(self, "_encoding"):
encoding = self.charset_encoding
- if encoding is None or not is_known_encoding(encoding):
+ if encoding is None or not _is_known_encoding(encoding):
if isinstance(self.default_encoding, str):
encoding = self.default_encoding
elif hasattr(self, "_content"):
if content_type is None:
return None
- return parse_content_type_charset(content_type)
+ return _parse_content_type_charset(content_type)
def _get_content_decoder(self) -> ContentDecoder:
"""
return {
(link.get("rel") or link.get("url")): link
- for link in parse_header_links(header)
+ for link in _parse_header_links(header)
}
@property
from __future__ import annotations
-import codecs
-import email.message
import ipaddress
import os
import re
return str(value)
-def is_known_encoding(encoding: str) -> bool:
- """
- Return `True` if `encoding` is a known codec.
- """
- try:
- codecs.lookup(encoding)
- except LookupError:
- return False
- return True
-
-
-def parse_header_links(value: str) -> list[dict[str, str]]:
- """
- Returns a list of parsed link headers, for more info see:
- https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
- The generic syntax of those is:
- Link: < uri-reference >; param1=value1; param2="value2"
- So for instance:
- Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;'
- would return
- [
- {"url": "http:/.../front.jpeg", "type": "image/jpeg"},
- {"url": "http://.../back.jpeg"},
- ]
- :param value: HTTP Link entity-header field
- :return: list of parsed link headers
- """
- links: list[dict[str, str]] = []
- replace_chars = " '\""
- value = value.strip(replace_chars)
- if not value:
- return links
- for val in re.split(", *<", value):
- try:
- url, params = val.split(";", 1)
- except ValueError:
- url, params = val, ""
- link = {"url": url.strip("<> '\"")}
- for param in params.split(";"):
- try:
- key, value = param.split("=")
- except ValueError:
- break
- link[key.strip(replace_chars)] = value.strip(replace_chars)
- links.append(link)
- return links
-
-
-def parse_content_type_charset(content_type: str) -> str | None:
- # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery.
- # See: https://peps.python.org/pep-0594/#cgi
- msg = email.message.Message()
- msg["content-type"] = content_type
- return msg.get_content_charset(failobj=None)
-
-
-SENSITIVE_HEADERS = {"authorization", "proxy-authorization"}
-
-
-def obfuscate_sensitive_headers(
- items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]],
-) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]:
- for k, v in items:
- if to_str(k.lower()) in SENSITIVE_HEADERS:
- v = to_bytes_or_str("[secure]", match_type_of=v)
- yield k, v
-
-
def port_or_default(url: URL) -> int | None:
if url.port is not None:
return url.port
set -x
-${PREFIX}ruff --fix $SOURCE_FILES
+${PREFIX}ruff check --fix $SOURCE_FILES
${PREFIX}ruff format $SOURCE_FILES
value = "s3kr3t"
h = httpx.Headers({header: value})
assert repr(h) == "Headers({'%s': '[secure]'})" % header
+
+
+@pytest.mark.parametrize(
+ "headers, output",
+ [
+ ([("content-type", "text/html")], [("content-type", "text/html")]),
+ ([("authorization", "s3kr3t")], [("authorization", "[secure]")]),
+ ([("proxy-authorization", "s3kr3t")], [("proxy-authorization", "[secure]")]),
+ ],
+)
+def test_obfuscate_sensitive_headers(headers, output):
+ as_dict = {k: v for k, v in output}
+ headers_class = httpx.Headers({k: v for k, v in headers})
+ assert repr(headers_class) == f"Headers({as_dict!r})"
+
+
+@pytest.mark.parametrize(
+ "value, expected",
+ (
+ (
+ '<http:/.../front.jpeg>; rel=front; type="image/jpeg"',
+ [{"url": "http:/.../front.jpeg", "rel": "front", "type": "image/jpeg"}],
+ ),
+ ("<http:/.../front.jpeg>", [{"url": "http:/.../front.jpeg"}]),
+ ("<http:/.../front.jpeg>;", [{"url": "http:/.../front.jpeg"}]),
+ (
+ '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;',
+ [
+ {"url": "http:/.../front.jpeg", "type": "image/jpeg"},
+ {"url": "http://.../back.jpeg"},
+ ],
+ ),
+ ("", []),
+ ),
+)
+def test_parse_header_links(value, expected):
+ all_links = httpx.Response(200, headers={"link": value}).links.values()
+ assert all(link in all_links for link in expected)
+
+
+def test_parse_header_links_no_link():
+ all_links = httpx.Response(200).links
+ assert all_links == {}
assert response.json() == {"abc": 123}
-@pytest.mark.parametrize(
- "value, expected",
- (
- (
- '<http:/.../front.jpeg>; rel=front; type="image/jpeg"',
- [{"url": "http:/.../front.jpeg", "rel": "front", "type": "image/jpeg"}],
- ),
- ("<http:/.../front.jpeg>", [{"url": "http:/.../front.jpeg"}]),
- ("<http:/.../front.jpeg>;", [{"url": "http:/.../front.jpeg"}]),
- (
- '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;',
- [
- {"url": "http:/.../front.jpeg", "type": "image/jpeg"},
- {"url": "http://.../back.jpeg"},
- ],
- ),
- ("", []),
- ),
-)
-def test_parse_header_links(value, expected):
- all_links = httpx.Response(200, headers={"link": value}).links.values()
- assert all(link in all_links for link in expected)
-
-
-def test_parse_header_links_no_link():
- all_links = httpx.Response(200).links
- assert all_links == {}
-
-
def test_logging_request(server, caplog):
caplog.set_level(logging.INFO)
with httpx.Client() as client:
assert get_environment_proxies() == proxies
-@pytest.mark.parametrize(
- "headers, output",
- [
- ([("content-type", "text/html")], [("content-type", "text/html")]),
- ([("authorization", "s3kr3t")], [("authorization", "[secure]")]),
- ([("proxy-authorization", "s3kr3t")], [("proxy-authorization", "[secure]")]),
- ],
-)
-def test_obfuscate_sensitive_headers(headers, output):
- as_dict = {k: v for k, v in output}
- headers_class = httpx.Headers({k: v for k, v in headers})
- assert repr(headers_class) == f"Headers({as_dict!r})"
-
-
def test_same_origin():
origin = httpx.URL("https://example.com")
request = httpx.Request("GET", "HTTPS://EXAMPLE.COM:443")