+# Timeout exceptions...
+
+
class Timeout(Exception):
"""
A base class for all timeouts.
"""
+# HTTP exceptions...
+
+
+class ProtocolError(Exception):
+ """
+ Malformed HTTP.
+ """
+
+
+# Redirect exceptions...
+
+
class RedirectError(Exception):
"""
Base class for HTTP redirect errors.
"""
-class ProtocolError(Exception):
- """
- Malformed HTTP.
- """
+# Response exceptions...
class StreamConsumed(Exception):
A case-insensitive multidict.
"""
- def __init__(self, headers: HeaderTypes = None) -> None:
+ def __init__(self, headers: HeaderTypes = None, encoding: str = None) -> None:
if headers is None:
self._list = [] # type: typing.List[typing.Tuple[bytes, bytes]]
elif isinstance(headers, Headers):
self._list = list(headers.raw)
elif isinstance(headers, dict):
self._list = [
- (normalize_header_key(k), normalize_header_value(v))
+ (normalize_header_key(k, encoding), normalize_header_value(v, encoding))
for k, v in headers.items()
]
else:
self._list = [
- (normalize_header_key(k), normalize_header_value(v)) for k, v in headers
+ (normalize_header_key(k, encoding), normalize_header_value(v, encoding))
+ for k, v in headers
]
+ self._encoding = encoding
+
+ @property
+ def encoding(self) -> str:
+ """
+ Header encoding is mandated as ascii, but utf-8 or iso-8859-1 may be
+ seen in the wild.
+ """
+ if self._encoding is None:
+ for encoding in ["ascii", "utf-8"]:
+ for key, value in self.raw:
+ try:
+ key.decode(encoding)
+ value.decode(encoding)
+ except UnicodeDecodeError:
+ break
+ else:
+ # The else block runs if 'break' did not occur, meaning
+ # all values fitted the encoding.
+ self._encoding = encoding
+ break
+ else:
+ # The ISO-8859-1 encoding covers all 256 code points in a byte,
+ # so will never raise decode errors.
+ self._encoding = "iso-8859-1"
+ return self._encoding
+
+ @encoding.setter
+ def encoding(self, value: str) -> None:
+ self._encoding = value
@property
def raw(self) -> typing.List[typing.Tuple[bytes, bytes]]:
+ """
+ Returns a list of the raw header items, as byte pairs.
+ May be mutated in-place.
+ """
return self._list
def keys(self) -> typing.List[str]: # type: ignore
- return [key.decode("latin-1") for key, value in self._list]
+ return [key.decode(self.encoding) for key, value in self._list]
def values(self) -> typing.List[str]: # type: ignore
- return [value.decode("latin-1") for key, value in self._list]
+ return [value.decode(self.encoding) for key, value in self._list]
def items(self) -> typing.List[typing.Tuple[str, str]]: # type: ignore
return [
- (key.decode("latin-1"), value.decode("latin-1"))
+ (key.decode(self.encoding), value.decode(self.encoding))
for key, value in self._list
]
except KeyError:
return default
- def getlist(self, key: str) -> typing.List[str]:
- get_header_key = key.lower().encode("latin-1")
- return [
- item_value.decode("latin-1")
+ def getlist(self, key: str, default: typing.Any = None, split_commas = None) -> typing.List[str]:
+ """
+ Return multiple header values.
+
+ If there are header values that include commas, then we default to
+ spliting them into multiple results, except for Set-Cookie.
+
+ See: https://tools.ietf.org/html/rfc7230#section-3.2.2
+ """
+ get_header_key = key.lower().encode(self.encoding)
+ if split_commas is None:
+ split_commas = get_header_key != b'set-cookie'
+
+ values = [
+ item_value.decode(self.encoding)
for item_key, item_value in self._list
if item_key == get_header_key
]
+ if not values:
+ return [] if default is None else default
+
+ if not split_commas:
+ return values
+
+ split_values = []
+ for value in values:
+ split_values.extend([item.strip() for item in value.split(",")])
+ return split_values
+
def __getitem__(self, key: str) -> str:
- get_header_key = key.lower().encode("latin-1")
+ """
+ Return a single header value.
+
+ If there are multiple headers with the same key, then we concatenate
+ them with commas. See: https://tools.ietf.org/html/rfc7230#section-3.2.2
+ """
+ normalized_key = key.lower().encode(self.encoding)
+
+ items = []
for header_key, header_value in self._list:
- if header_key == get_header_key:
- return header_value.decode("latin-1")
+ if header_key == normalized_key:
+ items.append(header_value.decode(self.encoding))
+
+ if items:
+ return ", ".join(items)
+
raise KeyError(key)
def __setitem__(self, key: str, value: str) -> None:
Set the header `key` to `value`, removing any duplicate entries.
Retains insertion order.
"""
- set_key = key.lower().encode("latin-1")
- set_value = value.encode("latin-1")
+ set_key = key.lower().encode(self.encoding)
+ set_value = value.encode(self.encoding)
found_indexes = []
for idx, (item_key, item_value) in enumerate(self._list):
"""
Remove the header `key`.
"""
- del_key = key.lower().encode("latin-1")
+ del_key = key.lower().encode(self.encoding)
pop_indexes = []
for idx, (item_key, item_value) in enumerate(self._list):
del self._list[idx]
def __contains__(self, key: typing.Any) -> bool:
- get_header_key = key.lower().encode("latin-1")
+ get_header_key = key.lower().encode(self.encoding)
for header_key, header_value in self._list:
if header_key == get_header_key:
return True
def __repr__(self) -> str:
class_name = self.__class__.__name__
+
+ encoding_str = ""
+ if self.encoding != "ascii":
+ encoding_str = f", encoding={self.encoding!r}"
+
as_dict = dict(self.items())
if len(as_dict) == len(self):
- return f"{class_name}({as_dict!r})"
- return f"{class_name}(raw={self.raw!r})"
+ return f"{class_name}({as_dict!r}{encoding_str})"
+ as_list = self.items()
+ return f"{class_name}({as_list!r}{encoding_str})"
class Request:
"""
if not hasattr(self, "_decoder"):
decoders = [] # type: typing.List[Decoder]
- value = self.headers.get("content-encoding", "identity")
- for part in value.split(","):
- part = part.strip().lower()
- decoder_cls = SUPPORTED_DECODERS[part]
+ values = self.headers.getlist("content-encoding", ["identity"])
+ for value in values:
+ value = value.strip().lower()
+ decoder_cls = SUPPORTED_DECODERS[value]
decoders.append(decoder_cls())
if len(decoders) == 1:
return quote(uri, safe=safe_without_percent)
-def normalize_header_key(value: typing.AnyStr) -> bytes:
+def normalize_header_key(value: typing.AnyStr, encoding: str = None) -> bytes:
"""
Coerce str/bytes into a strictly byte-wise HTTP header key.
"""
if isinstance(value, bytes):
return value.lower()
- return value.encode("latin-1").lower()
+ return value.encode(encoding or "ascii").lower()
-def normalize_header_value(value: typing.AnyStr) -> bytes:
+def normalize_header_value(value: typing.AnyStr, encoding: str = None) -> bytes:
"""
Coerce str/bytes into a strictly byte-wise HTTP header value.
"""
if isinstance(value, bytes):
return value
- return value.encode("latin-1")
+ return value.encode(encoding or "ascii")
def get_reason_phrase(status_code: int) -> str:
--- /dev/null
+import httpcore
+
+
+def test_headers():
+ h = httpcore.Headers([("a", "123"), ("a", "456"), ("b", "789")])
+ assert "a" in h
+ assert "A" in h
+ assert "b" in h
+ assert "B" in h
+ assert "c" not in h
+ assert h["a"] == "123, 456"
+ assert h.get("a") == "123, 456"
+ assert h.get("nope", default=None) is None
+ assert h.getlist("a") == ["123", "456"]
+ assert h.keys() == ["a", "a", "b"]
+ assert h.values() == ["123", "456", "789"]
+ assert h.items() == [("a", "123"), ("a", "456"), ("b", "789")]
+ assert list(h) == ["a", "a", "b"]
+ assert dict(h) == {"a": "123, 456", "b": "789"}
+ assert repr(h) == "Headers([('a', '123'), ('a', '456'), ('b', '789')])"
+ assert h == httpcore.Headers([("a", "123"), ("b", "789"), ("a", "456")])
+ assert h != [("a", "123"), ("A", "456"), ("b", "789")]
+
+ h = httpcore.Headers({"a": "123", "b": "789"})
+ assert h["A"] == "123"
+ assert h["B"] == "789"
+ assert h.raw == [(b"a", b"123"), (b"b", b"789")]
+ assert repr(h) == "Headers({'a': '123', 'b': '789'})"
+
+
+def test_header_mutations():
+ h = httpcore.Headers()
+ assert dict(h) == {}
+ h["a"] = "1"
+ assert dict(h) == {"a": "1"}
+ h["a"] = "2"
+ assert dict(h) == {"a": "2"}
+ h.setdefault("a", "3")
+ assert dict(h) == {"a": "2"}
+ h.setdefault("b", "4")
+ assert dict(h) == {"a": "2", "b": "4"}
+ del h["a"]
+ assert dict(h) == {"b": "4"}
+ assert h.raw == [(b"b", b"4")]
+
+
+def test_copy_headers():
+ headers = httpcore.Headers({"custom": "example"})
+ headers_copy = httpcore.Headers(headers)
+ assert headers == headers_copy
+
+
+def test_headers_insert_retains_ordering():
+ headers = httpcore.Headers({"a": "a", "b": "b", "c": "c"})
+ headers["b"] = "123"
+ assert list(headers.values()) == ["a", "123", "c"]
+
+
+def test_headers_insert_appends_if_new():
+ headers = httpcore.Headers({"a": "a", "b": "b", "c": "c"})
+ headers["d"] = "123"
+ assert list(headers.values()) == ["a", "b", "c", "123"]
+
+
+def test_headers_insert_removes_all_existing():
+ headers = httpcore.Headers([("a", "123"), ("a", "456")])
+ headers["a"] = "789"
+ assert dict(headers) == {"a": "789"}
+
+
+def test_headers_delete_removes_all_existing():
+ headers = httpcore.Headers([("a", "123"), ("a", "456")])
+ del headers["a"]
+ assert dict(headers) == {}
+
+
+def test_headers_dict_repr():
+ """
+ Headers should display with a dict repr by default.
+ """
+ headers = httpcore.Headers({"custom": "example"})
+ assert repr(headers) == "Headers({'custom': 'example'})"
+
+
+def test_headers_encoding_in_repr():
+ """
+ Headers should display an encoding in the repr if required.
+ """
+ headers = httpcore.Headers({b"custom": "example ☃".encode("utf-8")})
+ assert repr(headers) == "Headers({'custom': 'example ☃'}, encoding='utf-8')"
+
+
+def test_headers_list_repr():
+ """
+ Headers should display with a list repr if they include multiple identical keys.
+ """
+ headers = httpcore.Headers([("custom", "example 1"), ("custom", "example 2")])
+ assert (
+ repr(headers) == "Headers([('custom', 'example 1'), ('custom', 'example 2')])"
+ )
+
+
+def test_headers_decode_ascii():
+ """
+ Headers should decode as ascii by default.
+ """
+ raw_headers = [(b"Custom", b"Example")]
+ headers = httpcore.Headers(raw_headers)
+ assert dict(headers) == {"custom": "Example"}
+ assert headers.encoding == "ascii"
+
+
+def test_headers_decode_utf_8():
+ """
+ Headers containing non-ascii codepoints should default to decoding as utf-8.
+ """
+ raw_headers = [(b"Custom", "Code point: ☃".encode("utf-8"))]
+ headers = httpcore.Headers(raw_headers)
+ assert dict(headers) == {"custom": "Code point: ☃"}
+ assert headers.encoding == "utf-8"
+
+
+def test_headers_decode_iso_8859_1():
+ """
+ Headers containing non-UTF-8 codepoints should default to decoding as iso-8859-1.
+ """
+ raw_headers = [(b"Custom", "Code point: ÿ".encode("iso-8859-1"))]
+ headers = httpcore.Headers(raw_headers)
+ assert dict(headers) == {"custom": "Code point: ÿ"}
+ assert headers.encoding == "iso-8859-1"
+
+
+def test_headers_decode_explicit_encoding():
+ """
+ An explicit encoding may be set on headers in order to force a
+ particular decoding.
+ """
+ raw_headers = [(b"Custom", "Code point: ☃".encode("utf-8"))]
+ headers = httpcore.Headers(raw_headers)
+ headers.encoding = "iso-8859-1"
+ print(headers)
+ assert dict(headers) == {"custom": "Code point: â\x98\x83"}
+ assert headers.encoding == "iso-8859-1"
+
+
+def test_multiple_headers():
+ """
+ Most headers should split by commas for `getlist`, except 'Set-Cookie'.
+ """
+ h = httpcore.Headers([('set-cookie', 'a, b'), ('set-cookie', 'c')])
+ h.getlist('Set-Cookie') == ['a, b', 'b']
+
+ h = httpcore.Headers([('vary', 'a, b'), ('vary', 'c')])
+ h.getlist('Vary') == ['a', 'b', 'c']