import httpcore
+from .__version__ import __version__
from ._auth import Auth, BasicAuth, FunctionAuth
from ._config import (
DEFAULT_LIMITS,
create_ssl_context,
)
from ._content_streams import ContentStream
+from ._decoders import SUPPORTED_DECODERS
from ._exceptions import (
HTTPCORE_EXC_MAP,
InvalidURL,
logger = get_logger(__name__)
KEEPALIVE_EXPIRY = 5.0
+USER_AGENT = f"python-httpx/{__version__}"
+ACCEPT_ENCODING = ", ".join(
+ [key for key in SUPPORTED_DECODERS.keys() if key != "identity"]
+)
class BaseClient:
self._auth = self._build_auth(auth)
self._params = QueryParams(params)
- self._headers = Headers(headers)
+ self.headers = Headers(headers)
self._cookies = Cookies(cookies)
self._timeout = Timeout(timeout)
self.max_redirects = max_redirects
@headers.setter
def headers(self, headers: HeaderTypes) -> None:
- self._headers = Headers(headers)
+ client_headers = Headers(
+ {
+ b"Accept": b"*/*",
+ b"Accept-Encoding": ACCEPT_ENCODING.encode("ascii"),
+ b"Connection": b"keep-alive",
+ b"User-Agent": USER_AGENT.encode("ascii"),
+ }
+ )
+ client_headers.update(headers)
+ self._headers = client_headers
@property
def cookies(self) -> Cookies:
Merge a headers argument together with any headers on the client,
to create the headers used for the outgoing request.
"""
- if headers or self.headers:
- merged_headers = Headers(self.headers)
- merged_headers.update(headers)
- return merged_headers
- return headers
+ merged_headers = Headers(self.headers)
+ merged_headers.update(headers)
+ return merged_headers
def _merge_queryparams(
self, params: QueryParamTypes = None
import rfc3986
import rfc3986.exceptions
-from .__version__ import __version__
from ._content_streams import ByteStream, ContentStream, encode
from ._decoders import (
SUPPORTED_DECODERS,
@property
def username(self) -> str:
- userinfo = self._uri_reference.userinfo or ""
- return unquote(userinfo.partition(":")[0])
+ return unquote(self.userinfo.partition(":")[0])
@property
def password(self) -> str:
- userinfo = self._uri_reference.userinfo or ""
- return unquote(userinfo.partition(":")[2])
+ return unquote(self.userinfo.partition(":")[2])
@property
def host(self) -> str:
return self.get_list(key, split_commas=split_commas)
-USER_AGENT = f"python-httpx/{__version__}"
-ACCEPT_ENCODING = ", ".join(
- [key for key in SUPPORTED_DECODERS.keys() if key != "identity"]
-)
-
-
class Request:
def __init__(
self,
has_content_length = (
"content-length" in self.headers or "transfer-encoding" in self.headers
)
- has_user_agent = "user-agent" in self.headers
- has_accept = "accept" in self.headers
- has_accept_encoding = "accept-encoding" in self.headers
- has_connection = "connection" in self.headers
-
- if not has_host:
- url = self.url
- if url.userinfo:
- url = url.copy_with(username=None, password=None)
- auto_headers.append((b"host", url.authority.encode("ascii")))
+
+ if not has_host and self.url.authority:
+ host = self.url.copy_with(username=None, password=None).authority
+ auto_headers.append((b"host", host.encode("ascii")))
if not has_content_length and self.method in ("POST", "PUT", "PATCH"):
auto_headers.append((b"content-length", b"0"))
- if not has_user_agent:
- auto_headers.append((b"user-agent", USER_AGENT.encode("ascii")))
- if not has_accept:
- auto_headers.append((b"accept", b"*/*"))
- if not has_accept_encoding:
- auto_headers.append((b"accept-encoding", ACCEPT_ENCODING.encode()))
- if not has_connection:
- auto_headers.append((b"connection", b"keep-alive"))
self.headers = Headers(auto_headers + self.headers.raw)
}
+def test_remove_default_header():
+ """
+ Remove a default header from the Client.
+ """
+ url = "http://example.org/echo_headers"
+
+ client = httpx.Client(transport=MockTransport())
+ del client.headers["User-Agent"]
+
+ response = client.get(url)
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "headers": {
+ "accept": "*/*",
+ "accept-encoding": "gzip, deflate, br",
+ "connection": "keep-alive",
+ "host": "example.org",
+ }
+ }
+
+
def test_header_does_not_exist():
headers = httpx.Headers({"foo": "bar"})
with pytest.raises(KeyError):
assert request.content == b'{"test": 123}'
+def test_headers():
+ request = httpx.Request("POST", "http://example.org", json={"test": 123})
+
+ assert request.headers == httpx.Headers(
+ {
+ "Host": "example.org",
+ "Content-Type": "application/json",
+ "Content-Length": "13",
+ }
+ )
+
+
def test_read_and_stream_data():
# Ensure a request may still be streamed if it has been read.
# Needed for cases such as authentication classes that read the request body.