TimeoutConfig,
)
from .dispatch.connection_pool import ConnectionPool
-from .models import URL, Request, Response
+from .models import URL, BodyTypes, HeaderTypes, Request, Response, URLTypes
class Client:
async def request(
self,
method: str,
- url: typing.Union[str, URL],
+ url: URLTypes,
*,
- body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"",
- headers: typing.List[typing.Tuple[bytes, bytes]] = [],
+ body: BodyTypes = b"",
+ headers: HeaderTypes = None,
stream: bool = False,
allow_redirects: bool = True,
- ssl: typing.Optional[SSLConfig] = None,
- timeout: typing.Optional[TimeoutConfig] = None,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
) -> Response:
request = Request(method, url, headers=headers, body=body)
self.prepare_request(request)
async def get(
self,
- url: typing.Union[str, URL],
+ url: URLTypes,
*,
- headers: typing.List[typing.Tuple[bytes, bytes]] = [],
+ headers: HeaderTypes = None,
stream: bool = False,
- ssl: typing.Optional[SSLConfig] = None,
- timeout: typing.Optional[TimeoutConfig] = None,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> Response:
+ return await self.request(
+ "GET",
+ url,
+ headers=headers,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+
+ async def options(
+ self,
+ url: URLTypes,
+ *,
+ headers: HeaderTypes = None,
+ stream: bool = False,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> Response:
+ return await self.request(
+ "OPTIONS",
+ url,
+ headers=headers,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+
+ async def head(
+ self,
+ url: URLTypes,
+ *,
+ headers: HeaderTypes = None,
+ stream: bool = False,
+ allow_redirects: bool = False, # Note: Differs to usual default.
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
) -> Response:
return await self.request(
- "GET", url, headers=headers, stream=stream, ssl=ssl, timeout=timeout
+ "HEAD",
+ url,
+ headers=headers,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
)
async def post(
self,
- url: typing.Union[str, URL],
+ url: URLTypes,
*,
- body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"",
- headers: typing.List[typing.Tuple[bytes, bytes]] = [],
+ body: BodyTypes = b"",
+ headers: HeaderTypes = None,
stream: bool = False,
- ssl: typing.Optional[SSLConfig] = None,
- timeout: typing.Optional[TimeoutConfig] = None,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
) -> Response:
return await self.request(
"POST",
body=body,
headers=headers,
stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+
+ async def put(
+ self,
+ url: URLTypes,
+ *,
+ body: BodyTypes = b"",
+ headers: HeaderTypes = None,
+ stream: bool = False,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> Response:
+ return await self.request(
+ "PUT",
+ url,
+ body=body,
+ headers=headers,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+
+ async def patch(
+ self,
+ url: URLTypes,
+ *,
+ body: BodyTypes = b"",
+ headers: HeaderTypes = None,
+ stream: bool = False,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> Response:
+ return await self.request(
+ "PATCH",
+ url,
+ body=body,
+ headers=headers,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+
+ async def delete(
+ self,
+ url: URLTypes,
+ *,
+ body: BodyTypes = b"",
+ headers: HeaderTypes = None,
+ stream: bool = False,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> Response:
+ return await self.request(
+ "DELETE",
+ url,
+ body=body,
+ headers=headers,
+ stream=stream,
+ allow_redirects=allow_redirects,
ssl=ssl,
timeout=timeout,
)
*,
stream: bool = False,
allow_redirects: bool = True,
- ssl: typing.Optional[SSLConfig] = None,
- timeout: typing.Optional[TimeoutConfig] = None,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
) -> Response:
- options = {"stream": stream} # type: typing.Dict[str, typing.Any]
+ options = {
+ "stream": stream,
+ "allow_redirects": allow_redirects,
+ } # type: typing.Dict[str, typing.Any]
+
if ssl is not None:
options["ssl"] = ssl
if timeout is not None:
options["timeout"] = timeout
+
return await self.adapter.send(request, **options)
async def close(self) -> None:
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
+ context.verify_mode = ssl.CERT_REQUIRED
+
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_COMPRESSION
event = await self._receive_event(timeout)
assert isinstance(event, h11.Response)
- reason = event.reason.decode("latin1")
+ reason_phrase = event.reason.decode("latin1")
status_code = event.status_code
headers = event.headers
body = self._body_iter(timeout)
response = Response(
status_code=status_code,
- reason=reason,
+ reason_phrase=reason_phrase,
protocol="HTTP/1.1",
headers=headers,
body=body,
-import http
import typing
from urllib.parse import urlsplit
MultiDecoder,
)
from .exceptions import ResponseClosed, StreamConsumed
-from .utils import normalize_header_key, normalize_header_value
+from .status_codes import codes
+from .utils import get_reason_phrase, normalize_header_key, normalize_header_value
+
+URLTypes = typing.Union["URL", str]
+
+HeaderTypes = typing.Union[
+ "Headers",
+ typing.Dict[typing.AnyStr, typing.AnyStr],
+ typing.List[typing.Tuple[typing.AnyStr, typing.AnyStr]],
+]
+
+BodyTypes = typing.Union[bytes, typing.AsyncIterator[bytes]]
class URL:
- def __init__(self, url: str = "") -> None:
- self.components = urlsplit(url)
+ def __init__(self, url: URLTypes) -> None:
+ if isinstance(url, str):
+ self.components = urlsplit(url)
+ else:
+ self.components = url.components
+
if not self.components.scheme:
raise ValueError("No scheme included in URL.")
if self.components.scheme not in ("http", "https"):
return hash((self.is_ssl, self.hostname, self.port))
-HeaderTypes = typing.Union[
- "Headers",
- typing.Dict[typing.AnyStr, typing.AnyStr],
- typing.List[typing.Tuple[typing.AnyStr, typing.AnyStr]],
-]
-
-
class Headers(typing.MutableMapping[str, str]):
"""
A case-insensitive multidict.
url: typing.Union[str, URL],
*,
headers: HeaderTypes = None,
- body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"",
+ body: BodyTypes = b"",
):
self.method = method.upper()
self.url = URL(url) if isinstance(url, str) else url
self,
status_code: int,
*,
- reason: typing.Optional[str] = None,
- protocol: typing.Optional[str] = None,
- headers: typing.List[typing.Tuple[bytes, bytes]] = [],
- body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"",
+ reason_phrase: str = None,
+ protocol: str = None,
+ headers: HeaderTypes = None,
+ body: BodyTypes = b"",
on_close: typing.Callable = None,
request: Request = None,
history: typing.List["Response"] = None,
):
self.status_code = status_code
- if not reason:
- try:
- self.reason = http.HTTPStatus(status_code).phrase
- except ValueError as exc:
- self.reason = ""
+ if reason_phrase is None:
+ self.reason_phrase = get_reason_phrase(status_code)
else:
- self.reason = reason
+ self.reason_phrase = reason_phrase
self.protocol = protocol
self.headers = Headers(headers)
self.on_close = on_close
@property
def is_redirect(self) -> bool:
return (
- self.status_code in (301, 302, 303, 307, 308) and "location" in self.headers
+ self.status_code
+ in (
+ codes.moved_permanently,
+ codes.found,
+ codes.see_other,
+ codes.temporary_redirect,
+ codes.permanent_redirect,
+ )
+ and "location" in self.headers
)
return self._response.status_code
@property
- def reason(self) -> str:
- return self._response.reason
+ def reason_phrase(self) -> str:
+ return self._response.reason_phrase
@property
def headers(self) -> Headers:
+import http
import typing
from urllib.parse import quote
if isinstance(value, bytes):
return value
return value.encode("latin-1")
+
+
+def get_reason_phrase(status_code: int) -> str:
+ """
+ Return an HTTP reason phrase, eg. "OK" for 200, or "Not Found" for 404.
+ """
+ try:
+ return http.HTTPStatus(status_code).phrase
+ except ValueError as exc:
+ return ""
@pytest.mark.asyncio
async def test_get(server):
+ url = "http://127.0.0.1:8000/"
async with httpcore.Client() as client:
- response = await client.request("GET", "http://127.0.0.1:8000/")
+ response = await client.get(url)
assert response.status_code == 200
assert response.body == b"Hello, world!"
@pytest.mark.asyncio
async def test_post(server):
+ url = "http://127.0.0.1:8000/"
async with httpcore.Client() as client:
- response = await client.request(
- "POST", "http://127.0.0.1:8000/", body=b"Hello, world!"
- )
+ response = await client.post(url, body=b"Hello, world!")
assert response.status_code == 200
+import ssl
+
+import pytest
+
import httpcore
+@pytest.mark.asyncio
+async def test_load_ssl_config():
+ ssl_config = httpcore.SSLConfig()
+ context = await ssl_config.load_ssl_context()
+ assert context.verify_mode == ssl.VerifyMode.CERT_REQUIRED
+
+
+@pytest.mark.asyncio
+async def test_load_ssl_config_no_verify(verify=False):
+ ssl_config = httpcore.SSLConfig(verify=False)
+ context = await ssl_config.load_ssl_context()
+ assert context.verify_mode == ssl.VerifyMode.CERT_NONE
+
+
def test_ssl_repr():
ssl = httpcore.SSLConfig(verify=False)
assert repr(ssl) == "SSLConfig(cert=None, verify=False)"
def test_response():
response = httpcore.Response(200, body=b"Hello, world!")
assert response.status_code == 200
- assert response.reason == "OK"
+ assert response.reason_phrase == "OK"
assert response.body == b"Hello, world!"
assert response.is_closed
def test_unknown_status_code():
response = httpcore.Response(600)
assert response.status_code == 600
- assert response.reason == ""
+ assert response.reason_phrase == ""