## Feature support
* `HTTP/1.1` and `HTTP/2` Support.
-* `async`/`await` support for non-thread-blocking HTTP requests.
+* `async`/`await` support for non-blocking HTTP requests.
+* Strict timeouts everywhere by default.
* Fully type annotated.
* 100% test coverage. *TODO - Almost there.*
-from .backends.sync import SyncClient
-from .client import Client
+from .client import AsyncClient, Client
from .config import PoolLimits, SSLConfig, TimeoutConfig
from .constants import Protocol, codes
from .dispatch.connection import HTTPConnection
+++ /dev/null
-import asyncio
-import typing
-from types import TracebackType
-
-from ..client import Client
-from ..config import (
- DEFAULT_MAX_REDIRECTS,
- DEFAULT_POOL_LIMITS,
- DEFAULT_SSL_CONFIG,
- DEFAULT_TIMEOUT_CONFIG,
- PoolLimits,
- SSLConfig,
- TimeoutConfig,
-)
-from ..models import (
- URL,
- Headers,
- HeaderTypes,
- QueryParamTypes,
- Request,
- RequestData,
- Response,
- URLTypes,
-)
-
-
-class SyncResponse:
- def __init__(self, response: Response, loop: asyncio.AbstractEventLoop):
- self._response = response
- self._loop = loop
-
- @property
- def status_code(self) -> int:
- return self._response.status_code
-
- @property
- def reason_phrase(self) -> str:
- return self._response.reason_phrase
-
- @property
- def protocol(self) -> typing.Optional[str]:
- return self._response.protocol
-
- @property
- def headers(self) -> Headers:
- return self._response.headers
-
- @property
- def content(self) -> bytes:
- return self._response.content
-
- @property
- def text(self) -> str:
- return self._response.text
-
- def read(self) -> bytes:
- return self._loop.run_until_complete(self._response.read())
-
- def stream(self) -> typing.Iterator[bytes]:
- inner = self._response.stream()
- while True:
- try:
- yield self._loop.run_until_complete(inner.__anext__())
- except StopAsyncIteration as exc:
- break
-
- def raw(self) -> typing.Iterator[bytes]:
- inner = self._response.raw()
- while True:
- try:
- yield self._loop.run_until_complete(inner.__anext__())
- except StopAsyncIteration as exc:
- break
-
- def close(self) -> None:
- return self._loop.run_until_complete(self._response.close())
-
- def __repr__(self) -> str:
- class_name = self.__class__.__name__
- return f"<{class_name}(status_code={self.status_code})>"
-
-
-class SyncClient:
- def __init__(
- self,
- ssl: SSLConfig = DEFAULT_SSL_CONFIG,
- timeout: TimeoutConfig = DEFAULT_TIMEOUT_CONFIG,
- pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
- max_redirects: int = DEFAULT_MAX_REDIRECTS,
- ) -> None:
- self._client = Client(
- ssl=ssl,
- timeout=timeout,
- pool_limits=pool_limits,
- max_redirects=max_redirects,
- )
- self._loop = asyncio.new_event_loop()
-
- def request(
- self,
- method: str,
- url: URLTypes,
- *,
- data: RequestData = b"",
- query_params: QueryParamTypes = None,
- headers: HeaderTypes = None,
- stream: bool = False,
- allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
- ) -> SyncResponse:
- request = Request(
- method, url, data=data, query_params=query_params, headers=headers
- )
- self.prepare_request(request)
- response = self.send(
- request,
- stream=stream,
- allow_redirects=allow_redirects,
- ssl=ssl,
- timeout=timeout,
- )
- return response
-
- def get(
- self,
- url: URLTypes,
- *,
- query_params: QueryParamTypes = None,
- headers: HeaderTypes = None,
- stream: bool = False,
- allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
- ) -> SyncResponse:
- return self.request(
- "GET",
- url,
- headers=headers,
- stream=stream,
- allow_redirects=allow_redirects,
- ssl=ssl,
- timeout=timeout,
- )
-
- def options(
- self,
- url: URLTypes,
- *,
- query_params: QueryParamTypes = None,
- headers: HeaderTypes = None,
- stream: bool = False,
- allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
- ) -> SyncResponse:
- return self.request(
- "OPTIONS",
- url,
- headers=headers,
- stream=stream,
- allow_redirects=allow_redirects,
- ssl=ssl,
- timeout=timeout,
- )
-
- def head(
- self,
- url: URLTypes,
- *,
- query_params: QueryParamTypes = None,
- headers: HeaderTypes = None,
- stream: bool = False,
- allow_redirects: bool = False, # Note: Differs to usual default.
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
- ) -> SyncResponse:
- return self.request(
- "HEAD",
- url,
- headers=headers,
- stream=stream,
- allow_redirects=allow_redirects,
- ssl=ssl,
- timeout=timeout,
- )
-
- def post(
- self,
- url: URLTypes,
- *,
- data: RequestData = b"",
- query_params: QueryParamTypes = None,
- headers: HeaderTypes = None,
- stream: bool = False,
- allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
- ) -> SyncResponse:
- return self.request(
- "POST",
- url,
- data=data,
- headers=headers,
- stream=stream,
- allow_redirects=allow_redirects,
- ssl=ssl,
- timeout=timeout,
- )
-
- def put(
- self,
- url: URLTypes,
- *,
- data: RequestData = b"",
- query_params: QueryParamTypes = None,
- headers: HeaderTypes = None,
- stream: bool = False,
- allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
- ) -> SyncResponse:
- return self.request(
- "PUT",
- url,
- data=data,
- headers=headers,
- stream=stream,
- allow_redirects=allow_redirects,
- ssl=ssl,
- timeout=timeout,
- )
-
- def patch(
- self,
- url: URLTypes,
- *,
- data: RequestData = b"",
- query_params: QueryParamTypes = None,
- headers: HeaderTypes = None,
- stream: bool = False,
- allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
- ) -> SyncResponse:
- return self.request(
- "PATCH",
- url,
- data=data,
- headers=headers,
- stream=stream,
- allow_redirects=allow_redirects,
- ssl=ssl,
- timeout=timeout,
- )
-
- def delete(
- self,
- url: URLTypes,
- *,
- data: RequestData = b"",
- query_params: QueryParamTypes = None,
- headers: HeaderTypes = None,
- stream: bool = False,
- allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
- ) -> SyncResponse:
- return self.request(
- "DELETE",
- url,
- data=data,
- headers=headers,
- stream=stream,
- allow_redirects=allow_redirects,
- ssl=ssl,
- timeout=timeout,
- )
-
- def prepare_request(self, request: Request) -> None:
- self._client.prepare_request(request)
-
- def send(
- self,
- request: Request,
- *,
- stream: bool = False,
- allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
- ) -> SyncResponse:
- response = self._loop.run_until_complete(
- self._client.send(
- request,
- stream=stream,
- allow_redirects=allow_redirects,
- ssl=ssl,
- timeout=timeout,
- )
- )
- return SyncResponse(response, self._loop)
-
- def close(self) -> None:
- self._loop.run_until_complete(self._client.close())
-
- def __enter__(self) -> "SyncClient":
- return self
-
- def __exit__(
- self,
- exc_type: typing.Type[BaseException] = None,
- exc_value: BaseException = None,
- traceback: TracebackType = None,
- ) -> None:
- self.close()
+import asyncio
import typing
from types import TracebackType
Request,
RequestData,
Response,
+ SyncResponse,
URLTypes,
)
-class Client:
+class AsyncClient:
def __init__(
self,
ssl: SSLConfig = DEFAULT_SSL_CONFIG,
async def close(self) -> None:
await self.dispatch.close()
- async def __aenter__(self) -> "Client":
+ async def __aenter__(self) -> "AsyncClient":
return self
async def __aexit__(
traceback: TracebackType = None,
) -> None:
await self.close()
+
+
+class Client:
+ def __init__(
+ self,
+ ssl: SSLConfig = DEFAULT_SSL_CONFIG,
+ timeout: TimeoutConfig = DEFAULT_TIMEOUT_CONFIG,
+ pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
+ max_redirects: int = DEFAULT_MAX_REDIRECTS,
+ dispatch: Dispatcher = None,
+ ) -> None:
+ self._client = AsyncClient(
+ ssl=ssl,
+ timeout=timeout,
+ pool_limits=pool_limits,
+ max_redirects=max_redirects,
+ dispatch=dispatch,
+ )
+ self._loop = asyncio.new_event_loop()
+
+ def request(
+ self,
+ method: str,
+ url: URLTypes,
+ *,
+ data: RequestData = b"",
+ query_params: QueryParamTypes = None,
+ headers: HeaderTypes = None,
+ stream: bool = False,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> SyncResponse:
+ request = Request(
+ method, url, data=data, query_params=query_params, headers=headers
+ )
+ self.prepare_request(request)
+ response = self.send(
+ request,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+ return response
+
+ def get(
+ self,
+ url: URLTypes,
+ *,
+ query_params: QueryParamTypes = None,
+ headers: HeaderTypes = None,
+ stream: bool = False,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> SyncResponse:
+ return self.request(
+ "GET",
+ url,
+ headers=headers,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+
+ def options(
+ self,
+ url: URLTypes,
+ *,
+ query_params: QueryParamTypes = None,
+ headers: HeaderTypes = None,
+ stream: bool = False,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> SyncResponse:
+ return self.request(
+ "OPTIONS",
+ url,
+ headers=headers,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+
+ def head(
+ self,
+ url: URLTypes,
+ *,
+ query_params: QueryParamTypes = None,
+ headers: HeaderTypes = None,
+ stream: bool = False,
+ allow_redirects: bool = False, # Note: Differs to usual default.
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> SyncResponse:
+ return self.request(
+ "HEAD",
+ url,
+ headers=headers,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+
+ def post(
+ self,
+ url: URLTypes,
+ *,
+ data: RequestData = b"",
+ query_params: QueryParamTypes = None,
+ headers: HeaderTypes = None,
+ stream: bool = False,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> SyncResponse:
+ return self.request(
+ "POST",
+ url,
+ data=data,
+ headers=headers,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+
+ def put(
+ self,
+ url: URLTypes,
+ *,
+ data: RequestData = b"",
+ query_params: QueryParamTypes = None,
+ headers: HeaderTypes = None,
+ stream: bool = False,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> SyncResponse:
+ return self.request(
+ "PUT",
+ url,
+ data=data,
+ headers=headers,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+
+ def patch(
+ self,
+ url: URLTypes,
+ *,
+ data: RequestData = b"",
+ query_params: QueryParamTypes = None,
+ headers: HeaderTypes = None,
+ stream: bool = False,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> SyncResponse:
+ return self.request(
+ "PATCH",
+ url,
+ data=data,
+ headers=headers,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+
+ def delete(
+ self,
+ url: URLTypes,
+ *,
+ data: RequestData = b"",
+ query_params: QueryParamTypes = None,
+ headers: HeaderTypes = None,
+ stream: bool = False,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> SyncResponse:
+ return self.request(
+ "DELETE",
+ url,
+ data=data,
+ headers=headers,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+
+ def prepare_request(self, request: Request) -> None:
+ self._client.prepare_request(request)
+
+ def send(
+ self,
+ request: Request,
+ *,
+ stream: bool = False,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> SyncResponse:
+ response = self._loop.run_until_complete(
+ self._client.send(
+ request,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+ )
+ return SyncResponse(response, self._loop)
+
+ def close(self) -> None:
+ self._loop.run_until_complete(self._client.close())
+
+ def __enter__(self) -> "Client":
+ return self
+
+ def __exit__(
+ self,
+ exc_type: typing.Type[BaseException] = None,
+ exc_value: BaseException = None,
+ traceback: TracebackType = None,
+ ) -> None:
+ self.close()
+import asyncio
import cgi
import typing
from urllib.parse import parse_qsl, urlencode
and "location" in self.headers
)
- def __repr__(self) -> str:
- class_name = self.__class__.__name__
- return f"<{class_name}({self.status_code}, {self.reason_phrase!r})>"
-
def raise_for_status(self) -> None:
"""
Raise the `HttpError` if one occurred.
if message:
raise HttpError(message)
+
+ def __repr__(self) -> str:
+ class_name = self.__class__.__name__
+ return f"<{class_name}({self.status_code}, {self.reason_phrase!r})>"
+
+
+class SyncResponse:
+ """
+ A thread-synchronous response. This class proxies onto a `Response`
+ instance, providing standard synchronous interfaces where required.
+ """
+
+ def __init__(self, response: Response, loop: asyncio.AbstractEventLoop):
+ self._response = response
+ self._loop = loop
+
+ @property
+ def status_code(self) -> int:
+ return self._response.status_code
+
+ @property
+ def reason_phrase(self) -> str:
+ return self._response.reason_phrase
+
+ @property
+ def protocol(self) -> typing.Optional[str]:
+ return self._response.protocol
+
+ @property
+ def url(self) -> typing.Optional[URL]:
+ return self._response.url
+
+ @property
+ def request(self) -> typing.Optional[Request]:
+ return self._response.request
+
+ @property
+ def headers(self) -> Headers:
+ return self._response.headers
+
+ @property
+ def content(self) -> bytes:
+ return self._response.content
+
+ @property
+ def text(self) -> str:
+ return self._response.text
+
+ @property
+ def encoding(self) -> str:
+ return self._response.encoding
+
+ @property
+ def is_redirect(self) -> bool:
+ return self._response.is_redirect
+
+ def raise_for_status(self) -> None:
+ return self._response.raise_for_status()
+
+ def read(self) -> bytes:
+ return self._loop.run_until_complete(self._response.read())
+
+ def stream(self) -> typing.Iterator[bytes]:
+ inner = self._response.stream()
+ while True:
+ try:
+ yield self._loop.run_until_complete(inner.__anext__())
+ except StopAsyncIteration as exc:
+ break
+
+ def raw(self) -> typing.Iterator[bytes]:
+ inner = self._response.raw()
+ while True:
+ try:
+ yield self._loop.run_until_complete(inner.__anext__())
+ except StopAsyncIteration as exc:
+ break
+
+ def close(self) -> None:
+ return self._loop.run_until_complete(self._response.close())
--- /dev/null
+import pytest
+
+import httpcore
+
+
+@pytest.mark.asyncio
+async def test_get(server):
+ url = "http://127.0.0.1:8000/"
+ async with httpcore.AsyncClient() as client:
+ response = await client.get(url)
+ assert response.status_code == 200
+ assert response.text == "Hello, world!"
+
+
+@pytest.mark.asyncio
+async def test_post(server):
+ url = "http://127.0.0.1:8000/"
+ async with httpcore.AsyncClient() as client:
+ response = await client.post(url, data=b"Hello, world!")
+ assert response.status_code == 200
+
+
+@pytest.mark.asyncio
+async def test_stream_response(server):
+ async with httpcore.AsyncClient() as client:
+ response = await client.request("GET", "http://127.0.0.1:8000/", stream=True)
+ assert response.status_code == 200
+ body = await response.read()
+ assert body == b"Hello, world!"
+ assert response.content == b"Hello, world!"
+
+
+@pytest.mark.asyncio
+async def test_access_content_stream_response(server):
+ async with httpcore.AsyncClient() as client:
+ response = await client.request("GET", "http://127.0.0.1:8000/", stream=True)
+ assert response.status_code == 200
+ with pytest.raises(httpcore.ResponseNotRead):
+ response.content
+
+
+@pytest.mark.asyncio
+async def test_stream_request(server):
+ async def hello_world():
+ yield b"Hello, "
+ yield b"world!"
+
+ async with httpcore.AsyncClient() as client:
+ response = await client.request(
+ "POST", "http://127.0.0.1:8000/", data=hello_world()
+ )
+ assert response.status_code == 200
+
+
+@pytest.mark.asyncio
+async def test_raise_for_status(server):
+ async with httpcore.AsyncClient() as client:
+ for status_code in (200, 400, 404, 500, 505):
+ response = await client.request(
+ "GET", f"http://127.0.0.1:8000/status/{status_code}"
+ )
+
+ if 400 <= status_code < 600:
+ with pytest.raises(httpcore.exceptions.HttpError):
+ response.raise_for_status()
+ else:
+ assert response.raise_for_status() is None
+import asyncio
+import functools
+
import pytest
import httpcore
-@pytest.mark.asyncio
-async def test_get(server):
- url = "http://127.0.0.1:8000/"
- async with httpcore.Client() as client:
- response = await client.get(url)
- assert response.status_code == 200
- assert response.text == "Hello, world!"
+def threadpool(func):
+ """
+ Our sync tests should run in seperate thread to the uvicorn server.
+ """
+ @functools.wraps(func)
+ async def wrapped(*args, **kwargs):
+ nonlocal func
-@pytest.mark.asyncio
-async def test_post(server):
- url = "http://127.0.0.1:8000/"
- async with httpcore.Client() as client:
- response = await client.post(url, data=b"Hello, world!")
- assert response.status_code == 200
+ loop = asyncio.get_event_loop()
+ if kwargs:
+ func = functools.partial(func, **kwargs)
+ await loop.run_in_executor(None, func, *args)
+ return pytest.mark.asyncio(wrapped)
-@pytest.mark.asyncio
-async def test_stream_response(server):
- async with httpcore.Client() as client:
- response = await client.request("GET", "http://127.0.0.1:8000/", stream=True)
+
+@threadpool
+def test_get(server):
+ with httpcore.Client() as http:
+ response = http.get("http://127.0.0.1:8000/")
assert response.status_code == 200
- body = await response.read()
- assert body == b"Hello, world!"
assert response.content == b"Hello, world!"
+ assert response.text == "Hello, world!"
-@pytest.mark.asyncio
-async def test_access_content_stream_response(server):
- async with httpcore.Client() as client:
- response = await client.request("GET", "http://127.0.0.1:8000/", stream=True)
+@threadpool
+def test_post(server):
+ with httpcore.Client() as http:
+ response = http.post("http://127.0.0.1:8000/", data=b"Hello, world!")
assert response.status_code == 200
- with pytest.raises(httpcore.ResponseNotRead):
- response.content
+ assert response.reason_phrase == "OK"
-@pytest.mark.asyncio
-async def test_stream_request(server):
- async def hello_world():
- yield b"Hello, "
- yield b"world!"
-
- async with httpcore.Client() as client:
- response = await client.request(
- "POST", "http://127.0.0.1:8000/", data=hello_world()
- )
+@threadpool
+def test_stream_response(server):
+ with httpcore.Client() as http:
+ response = http.get("http://127.0.0.1:8000/", stream=True)
assert response.status_code == 200
+ content = response.read()
+ assert content == b"Hello, world!"
+
+@threadpool
+def test_stream_iterator(server):
+ with httpcore.Client() as http:
+ response = http.get("http://127.0.0.1:8000/", stream=True)
+ assert response.status_code == 200
+ body = b""
+ for chunk in response.stream():
+ body += chunk
+ assert body == b"Hello, world!"
-@pytest.mark.asyncio
-async def test_raise_for_status(server):
- async with httpcore.Client() as client:
- for status_code in (200, 400, 404, 500, 505):
- response = await client.request(
- "GET", f"http://127.0.0.1:8000/status/{status_code}"
- )
- if 400 <= status_code < 600:
- with pytest.raises(httpcore.exceptions.HttpError):
- response.raise_for_status()
- else:
- assert response.raise_for_status() is None
+@threadpool
+def test_raw_iterator(server):
+ with httpcore.Client() as http:
+ response = http.get("http://127.0.0.1:8000/", stream=True)
+ assert response.status_code == 200
+ body = b""
+ for chunk in response.raw():
+ body += chunk
+ assert body == b"Hello, world!"
from httpcore import (
URL,
- Client,
+ AsyncClient,
Dispatcher,
RedirectBodyUnavailable,
RedirectLoop,
@pytest.mark.asyncio
async def test_redirect_301():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
response = await client.request("POST", "https://example.org/redirect_301")
assert response.status_code == codes.ok
assert response.url == URL("https://example.org/")
@pytest.mark.asyncio
async def test_redirect_302():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
response = await client.request("POST", "https://example.org/redirect_302")
assert response.status_code == codes.ok
assert response.url == URL("https://example.org/")
@pytest.mark.asyncio
async def test_redirect_303():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
response = await client.request("GET", "https://example.org/redirect_303")
assert response.status_code == codes.ok
assert response.url == URL("https://example.org/")
@pytest.mark.asyncio
async def test_disallow_redirects():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
response = await client.request(
"POST", "https://example.org/redirect_303", allow_redirects=False
)
@pytest.mark.asyncio
async def test_relative_redirect():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
response = await client.request("GET", "https://example.org/relative_redirect")
assert response.status_code == codes.ok
assert response.url == URL("https://example.org/")
@pytest.mark.asyncio
async def test_no_scheme_redirect():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
response = await client.request("GET", "https://example.org/no_scheme_redirect")
assert response.status_code == codes.ok
assert response.url == URL("https://example.org/")
@pytest.mark.asyncio
async def test_fragment_redirect():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
url = "https://example.org/relative_redirect#fragment"
response = await client.request("GET", url)
assert response.status_code == codes.ok
@pytest.mark.asyncio
async def test_multiple_redirects():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
url = "https://example.org/multiple_redirects?count=20"
response = await client.request("GET", url)
assert response.status_code == codes.ok
@pytest.mark.asyncio
async def test_too_many_redirects():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
with pytest.raises(TooManyRedirects):
await client.request("GET", "https://example.org/multiple_redirects?count=21")
@pytest.mark.asyncio
async def test_too_many_redirects_calling_next():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
url = "https://example.org/multiple_redirects?count=21"
response = await client.request("GET", url, allow_redirects=False)
with pytest.raises(TooManyRedirects):
@pytest.mark.asyncio
async def test_redirect_loop():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
with pytest.raises(RedirectLoop):
await client.request("GET", "https://example.org/redirect_loop")
@pytest.mark.asyncio
async def test_redirect_loop_calling_next():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
url = "https://example.org/redirect_loop"
response = await client.request("GET", url, allow_redirects=False)
with pytest.raises(RedirectLoop):
@pytest.mark.asyncio
async def test_cross_domain_redirect():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
url = "https://example.com/cross_domain"
headers = {"Authorization": "abc"}
response = await client.request("GET", url, headers=headers)
@pytest.mark.asyncio
async def test_same_domain_redirect():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
url = "https://example.org/cross_domain"
headers = {"Authorization": "abc"}
response = await client.request("GET", url, headers=headers)
@pytest.mark.asyncio
async def test_body_redirect():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
url = "https://example.org/redirect_body"
data = b"Example request body"
response = await client.request("POST", url, data=data)
@pytest.mark.asyncio
async def test_cannot_redirect_streaming_body():
- client = Client(dispatch=MockDispatch())
+ client = AsyncClient(dispatch=MockDispatch())
url = "https://example.org/redirect_body"
async def streaming_body():
+++ /dev/null
-import asyncio
-import functools
-
-import pytest
-
-import httpcore
-
-
-def threadpool(func):
- """
- Our sync tests should run in seperate thread to the uvicorn server.
- """
-
- @functools.wraps(func)
- async def wrapped(*args, **kwargs):
- nonlocal func
-
- loop = asyncio.get_event_loop()
- if kwargs:
- func = functools.partial(func, **kwargs)
- await loop.run_in_executor(None, func, *args)
-
- return pytest.mark.asyncio(wrapped)
-
-
-@threadpool
-def test_get(server):
- with httpcore.SyncClient() as http:
- response = http.get("http://127.0.0.1:8000/")
- assert response.status_code == 200
- assert response.content == b"Hello, world!"
- assert response.text == "Hello, world!"
-
-
-@threadpool
-def test_post(server):
- with httpcore.SyncClient() as http:
- response = http.post("http://127.0.0.1:8000/", data=b"Hello, world!")
- assert response.status_code == 200
- assert response.reason_phrase == "OK"
-
-
-@threadpool
-def test_stream_response(server):
- with httpcore.SyncClient() as http:
- response = http.get("http://127.0.0.1:8000/", stream=True)
- assert response.status_code == 200
- content = response.read()
- assert content == b"Hello, world!"
-
-
-@threadpool
-def test_stream_iterator(server):
- with httpcore.SyncClient() as http:
- response = http.get("http://127.0.0.1:8000/", stream=True)
- assert response.status_code == 200
- body = b""
- for chunk in response.stream():
- body += chunk
- assert body == b"Hello, world!"
-
-
-@threadpool
-def test_raw_iterator(server):
- with httpcore.SyncClient() as http:
- response = http.get("http://127.0.0.1:8000/", stream=True)
- assert response.status_code == 200
- body = b""
- for chunk in response.raw():
- body += chunk
- assert body == b"Hello, world!"
import pytest
from httpcore import (
- Client,
+ AsyncClient,
ConnectTimeout,
PoolLimits,
PoolTimeout,
async def test_read_timeout(server):
timeout = TimeoutConfig(read_timeout=0.0001)
- async with Client(timeout=timeout) as client:
+ async with AsyncClient(timeout=timeout) as client:
with pytest.raises(ReadTimeout):
await client.get("http://127.0.0.1:8000/slow_response")
async def test_connect_timeout(server):
timeout = TimeoutConfig(connect_timeout=0.0001)
- async with Client(timeout=timeout) as client:
+ async with AsyncClient(timeout=timeout) as client:
with pytest.raises(ConnectTimeout):
# See https://stackoverflow.com/questions/100841/
await client.get("http://10.255.255.1/")
async def test_pool_timeout(server):
pool_limits = PoolLimits(hard_limit=1, pool_timeout=0.0001)
- async with Client(pool_limits=pool_limits) as client:
+ async with AsyncClient(pool_limits=pool_limits) as client:
response = await client.get("http://127.0.0.1:8000/", stream=True)
with pytest.raises(PoolTimeout):