# concurrency backends.
# The sync client performs I/O on its own, so it doesn't need to support
# arbitrary concurrency backends.
- # Therefore, we kept the `backend` parameter (for testing/mocking), but enforce
- # that the concurrency backend derives from the asyncio one.
- if not isinstance(backend, AsyncioBackend):
- raise ValueError(
- "'Client' only supports asyncio-based concurrency backends"
- )
+ # Therefore, we keep the `backend` parameter (for testing/mocking), but require
+ # that the concurrency backend relies on asyncio.
+
+ if isinstance(backend, AsyncioBackend):
+ return
+
+ if hasattr(backend, "loop"):
+ # Most likely a proxy class.
+ return
+
+ raise ValueError("'Client' only supports asyncio-based concurrency backends")
def _async_request_data(
self, data: RequestData = None
import httpx
-@pytest.mark.asyncio
-async def test_get(server):
+async def test_get(server, backend):
url = "http://127.0.0.1:8000/"
- async with httpx.AsyncClient() as client:
+ async with httpx.AsyncClient(backend=backend) as client:
response = await client.get(url)
assert response.status_code == 200
assert response.text == "Hello, world!"
@pytest.mark.asyncio
-async def test_post(server):
+async def test_get_no_backend(server):
+ """
+ Verify that the client is capable of making a simple request if not given a backend.
+ """
url = "http://127.0.0.1:8000/"
async with httpx.AsyncClient() as client:
+ response = await client.get(url)
+ assert response.status_code == 200
+ assert response.text == "Hello, world!"
+ assert response.http_version == "HTTP/1.1"
+ assert response.headers
+ assert repr(response) == "<Response [200 OK]>"
+
+
+async def test_post(server, backend):
+ url = "http://127.0.0.1:8000/"
+ async with httpx.AsyncClient(backend=backend) as client:
response = await client.post(url, data=b"Hello, world!")
assert response.status_code == 200
-@pytest.mark.asyncio
-async def test_post_json(server):
+async def test_post_json(server, backend):
url = "http://127.0.0.1:8000/"
- async with httpx.AsyncClient() as client:
+ async with httpx.AsyncClient(backend=backend) as client:
response = await client.post(url, json={"text": "Hello, world!"})
assert response.status_code == 200
-@pytest.mark.asyncio
-async def test_stream_response(server):
- async with httpx.AsyncClient() as client:
+async def test_stream_response(server, backend):
+ async with httpx.AsyncClient(backend=backend) as client:
response = await client.request("GET", "http://127.0.0.1:8000/", stream=True)
assert response.status_code == 200
body = await response.read()
assert response.content == b"Hello, world!"
-@pytest.mark.asyncio
-async def test_access_content_stream_response(server):
- async with httpx.AsyncClient() as client:
+async def test_access_content_stream_response(server, backend):
+ async with httpx.AsyncClient(backend=backend) as client:
response = await client.request("GET", "http://127.0.0.1:8000/", stream=True)
assert response.status_code == 200
with pytest.raises(httpx.ResponseNotRead):
response.content
-@pytest.mark.asyncio
-async def test_stream_request(server):
+async def test_stream_request(server, backend):
async def hello_world():
yield b"Hello, "
yield b"world!"
- async with httpx.AsyncClient() as client:
+ async with httpx.AsyncClient(backend=backend) as client:
response = await client.request(
"POST", "http://127.0.0.1:8000/", data=hello_world()
)
assert response.status_code == 200
-@pytest.mark.asyncio
-async def test_raise_for_status(server):
- async with httpx.AsyncClient() as client:
+async def test_raise_for_status(server, backend):
+ async with httpx.AsyncClient(backend=backend) as client:
for status_code in (200, 400, 404, 500, 505):
response = await client.request(
"GET", f"http://127.0.0.1:8000/status/{status_code}"
assert response.raise_for_status() is None
-@pytest.mark.asyncio
-async def test_options(server):
+async def test_options(server, backend):
url = "http://127.0.0.1:8000/"
- async with httpx.AsyncClient() as client:
+ async with httpx.AsyncClient(backend=backend) as client:
response = await client.options(url)
assert response.status_code == 200
assert response.text == "Hello, world!"
-@pytest.mark.asyncio
-async def test_head(server):
+async def test_head(server, backend):
url = "http://127.0.0.1:8000/"
- async with httpx.AsyncClient() as client:
+ async with httpx.AsyncClient(backend=backend) as client:
response = await client.head(url)
assert response.status_code == 200
assert response.text == ""
-@pytest.mark.asyncio
-async def test_put(server):
+async def test_put(server, backend):
url = "http://127.0.0.1:8000/"
- async with httpx.AsyncClient() as client:
+ async with httpx.AsyncClient(backend=backend) as client:
response = await client.put(url, data=b"Hello, world!")
assert response.status_code == 200
-@pytest.mark.asyncio
-async def test_patch(server):
+async def test_patch(server, backend):
url = "http://127.0.0.1:8000/"
- async with httpx.AsyncClient() as client:
+ async with httpx.AsyncClient(backend=backend) as client:
response = await client.patch(url, data=b"Hello, world!")
assert response.status_code == 200
-@pytest.mark.asyncio
-async def test_delete(server):
+async def test_delete(server, backend):
url = "http://127.0.0.1:8000/"
- async with httpx.AsyncClient() as client:
+ async with httpx.AsyncClient(backend=backend) as client:
response = await client.delete(url)
assert response.status_code == 200
assert response.text == "Hello, world!"
-@pytest.mark.asyncio
-async def test_100_continue(server):
+async def test_100_continue(server, backend):
url = "http://127.0.0.1:8000/echo_body"
headers = {"Expect": "100-continue"}
data = b"Echo request body"
- async with httpx.AsyncClient() as client:
+ async with httpx.AsyncClient(backend=backend) as client:
response = await client.post(url, headers=headers, data=data)
assert response.status_code == 200
return AsyncResponse(codes.OK, content=b"Hello, world!", request=request)
-@pytest.mark.asyncio
-async def test_redirect_301():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_redirect_301(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
response = await client.post("https://example.org/redirect_301")
assert response.status_code == codes.OK
assert response.url == URL("https://example.org/")
assert len(response.history) == 1
-@pytest.mark.asyncio
-async def test_redirect_302():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_redirect_302(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
response = await client.post("https://example.org/redirect_302")
assert response.status_code == codes.OK
assert response.url == URL("https://example.org/")
assert len(response.history) == 1
-@pytest.mark.asyncio
-async def test_redirect_303():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_redirect_303(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
response = await client.get("https://example.org/redirect_303")
assert response.status_code == codes.OK
assert response.url == URL("https://example.org/")
assert len(response.history) == 1
-@pytest.mark.asyncio
-async def test_disallow_redirects():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_disallow_redirects(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
response = await client.post(
"https://example.org/redirect_303", allow_redirects=False
)
assert len(response.history) == 1
-@pytest.mark.asyncio
-async def test_relative_redirect():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_relative_redirect(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
response = await client.get("https://example.org/relative_redirect")
assert response.status_code == codes.OK
assert response.url == URL("https://example.org/")
assert len(response.history) == 1
-@pytest.mark.asyncio
-async def test_no_scheme_redirect():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_no_scheme_redirect(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
response = await client.get("https://example.org/no_scheme_redirect")
assert response.status_code == codes.OK
assert response.url == URL("https://example.org/")
assert len(response.history) == 1
-@pytest.mark.asyncio
-async def test_fragment_redirect():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_fragment_redirect(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
response = await client.get("https://example.org/relative_redirect#fragment")
assert response.status_code == codes.OK
assert response.url == URL("https://example.org/#fragment")
assert len(response.history) == 1
-@pytest.mark.asyncio
-async def test_multiple_redirects():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_multiple_redirects(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
response = await client.get("https://example.org/multiple_redirects?count=20")
assert response.status_code == codes.OK
assert response.url == URL("https://example.org/multiple_redirects")
assert len(response.history[1].history) == 1
-@pytest.mark.asyncio
-async def test_too_many_redirects():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_too_many_redirects(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
with pytest.raises(TooManyRedirects):
await client.get("https://example.org/multiple_redirects?count=21")
-@pytest.mark.asyncio
-async def test_too_many_redirects_calling_next():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_too_many_redirects_calling_next(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
url = "https://example.org/multiple_redirects?count=21"
response = await client.get(url, allow_redirects=False)
with pytest.raises(TooManyRedirects):
response = await response.next()
-@pytest.mark.asyncio
-async def test_redirect_loop():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_redirect_loop(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
with pytest.raises(RedirectLoop):
await client.get("https://example.org/redirect_loop")
-@pytest.mark.asyncio
-async def test_redirect_loop_calling_next():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_redirect_loop_calling_next(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
url = "https://example.org/redirect_loop"
response = await client.get(url, allow_redirects=False)
with pytest.raises(RedirectLoop):
response = await response.next()
-@pytest.mark.asyncio
-async def test_cross_domain_redirect():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_cross_domain_redirect(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
url = "https://example.com/cross_domain"
headers = {"Authorization": "abc"}
response = await client.get(url, headers=headers)
assert "authorization" not in response.json()["headers"]
-@pytest.mark.asyncio
-async def test_same_domain_redirect():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_same_domain_redirect(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
url = "https://example.org/cross_domain"
headers = {"Authorization": "abc"}
response = await client.get(url, headers=headers)
assert response.json()["headers"]["authorization"] == "abc"
-@pytest.mark.asyncio
-async def test_body_redirect():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_body_redirect(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
url = "https://example.org/redirect_body"
data = b"Example request body"
response = await client.post(url, data=data)
assert response.json() == {"body": "Example request body"}
-@pytest.mark.asyncio
-async def test_cannot_redirect_streaming_body():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_cannot_redirect_streaming_body(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
url = "https://example.org/redirect_body"
async def streaming_body():
await client.post(url, data=streaming_body())
-@pytest.mark.asyncio
-async def test_cross_dubdomain_redirect():
- client = AsyncClient(dispatch=MockDispatch())
+async def test_cross_dubdomain_redirect(backend):
+ client = AsyncClient(dispatch=MockDispatch(), backend=backend)
url = "https://example.com/cross_subdomain"
response = await client.get(url)
assert response.url == URL("https://www.example.org/cross_subdomain")
--- /dev/null
+"""
+This module contains concurrency utilities that are only used in tests, thus not
+required as part of the ConcurrencyBackend API.
+"""
+
+import asyncio
+import functools
+
+from httpx import AsyncioBackend
+
+
+@functools.singledispatch
+async def sleep(backend, seconds: int):
+ raise NotImplementedError # pragma: no cover
+
+
+@sleep.register(AsyncioBackend)
+async def _sleep_asyncio(backend, seconds: int):
+ await asyncio.sleep(seconds)
from uvicorn.config import Config
from uvicorn.main import Server
+from httpx import AsyncioBackend
+
+
+@pytest.fixture(params=[pytest.param(AsyncioBackend, marks=pytest.mark.asyncio)])
+def backend(request):
+ backend_cls = request.param
+ return backend_cls()
+
async def app(scope, receive, send):
assert scope["type"] == "http"
finally:
server.should_exit = True
await task
+
+
+@pytest.fixture
+def restart(backend):
+ async def asyncio_restart(server):
+ await server.shutdown()
+ await server.startup()
+
+ if isinstance(backend, AsyncioBackend):
+ return asyncio_restart
+
+ # The uvicorn server runs under asyncio, so we will need to figure out
+ # how to restart it under a different I/O library.
+ # This will most likely require running `asyncio_restart` in the threadpool,
+ # but that might not be sufficient.
+ raise NotImplementedError
-import pytest
-
import httpx
-@pytest.mark.asyncio
-async def test_keepalive_connections(server):
+async def test_keepalive_connections(server, backend):
"""
Connections should default to staying in a keep-alive state.
"""
- async with httpx.ConnectionPool() as http:
+ async with httpx.ConnectionPool(backend=backend) as http:
response = await http.request("GET", "http://127.0.0.1:8000/")
await response.read()
assert len(http.active_connections) == 0
assert len(http.keepalive_connections) == 1
-@pytest.mark.asyncio
-async def test_differing_connection_keys(server):
+async def test_differing_connection_keys(server, backend):
"""
Connections to differing connection keys should result in multiple connections.
"""
- async with httpx.ConnectionPool() as http:
+ async with httpx.ConnectionPool(backend=backend) as http:
response = await http.request("GET", "http://127.0.0.1:8000/")
await response.read()
assert len(http.active_connections) == 0
assert len(http.keepalive_connections) == 2
-@pytest.mark.asyncio
-async def test_soft_limit(server):
+async def test_soft_limit(server, backend):
"""
The soft_limit config should limit the maximum number of keep-alive connections.
"""
pool_limits = httpx.PoolLimits(soft_limit=1)
- async with httpx.ConnectionPool(pool_limits=pool_limits) as http:
+ async with httpx.ConnectionPool(pool_limits=pool_limits, backend=backend) as http:
response = await http.request("GET", "http://127.0.0.1:8000/")
await response.read()
assert len(http.active_connections) == 0
assert len(http.keepalive_connections) == 1
-@pytest.mark.asyncio
-async def test_streaming_response_holds_connection(server):
+async def test_streaming_response_holds_connection(server, backend):
"""
A streaming request should hold the connection open until the response is read.
"""
- async with httpx.ConnectionPool() as http:
+ async with httpx.ConnectionPool(backend=backend) as http:
response = await http.request("GET", "http://127.0.0.1:8000/")
assert len(http.active_connections) == 1
assert len(http.keepalive_connections) == 0
assert len(http.keepalive_connections) == 1
-@pytest.mark.asyncio
-async def test_multiple_concurrent_connections(server):
+async def test_multiple_concurrent_connections(server, backend):
"""
Multiple conncurrent requests should open multiple conncurrent connections.
"""
- async with httpx.ConnectionPool() as http:
+ async with httpx.ConnectionPool(backend=backend) as http:
response_a = await http.request("GET", "http://127.0.0.1:8000/")
assert len(http.active_connections) == 1
assert len(http.keepalive_connections) == 0
assert len(http.keepalive_connections) == 2
-@pytest.mark.asyncio
-async def test_close_connections(server):
+async def test_close_connections(server, backend):
"""
Using a `Connection: close` header should close the connection.
"""
headers = [(b"connection", b"close")]
- async with httpx.ConnectionPool() as http:
+ async with httpx.ConnectionPool(backend=backend) as http:
response = await http.request("GET", "http://127.0.0.1:8000/", headers=headers)
await response.read()
assert len(http.active_connections) == 0
assert len(http.keepalive_connections) == 0
-@pytest.mark.asyncio
-async def test_standard_response_close(server):
+async def test_standard_response_close(server, backend):
"""
A standard close should keep the connection open.
"""
- async with httpx.ConnectionPool() as http:
+ async with httpx.ConnectionPool(backend=backend) as http:
response = await http.request("GET", "http://127.0.0.1:8000/")
await response.read()
await response.close()
assert len(http.keepalive_connections) == 1
-@pytest.mark.asyncio
-async def test_premature_response_close(server):
+async def test_premature_response_close(server, backend):
"""
A premature close should close the connection.
"""
- async with httpx.ConnectionPool() as http:
+ async with httpx.ConnectionPool(backend=backend) as http:
response = await http.request("GET", "http://127.0.0.1:8000/")
await response.close()
assert len(http.active_connections) == 0
assert len(http.keepalive_connections) == 0
-@pytest.mark.asyncio
-async def test_keepalive_connection_closed_by_server_is_reestablished(server):
+async def test_keepalive_connection_closed_by_server_is_reestablished(
+ server, restart, backend
+):
"""
Upon keep-alive connection closed by remote a new connection
should be reestablished.
"""
- async with httpx.ConnectionPool() as http:
+ async with httpx.ConnectionPool(backend=backend) as http:
response = await http.request("GET", "http://127.0.0.1:8000/")
await response.read()
# shutdown the server to close the keep-alive connection
- await server.shutdown()
- await server.startup()
+ await restart(server)
response = await http.request("GET", "http://127.0.0.1:8000/")
await response.read()
assert len(http.keepalive_connections) == 1
-@pytest.mark.asyncio
-async def test_keepalive_http2_connection_closed_by_server_is_reestablished(server):
+async def test_keepalive_http2_connection_closed_by_server_is_reestablished(
+ server, restart, backend
+):
"""
Upon keep-alive connection closed by remote a new connection
should be reestablished.
"""
- async with httpx.ConnectionPool() as http:
+ async with httpx.ConnectionPool(backend=backend) as http:
response = await http.request("GET", "http://127.0.0.1:8000/")
await response.read()
# shutdown the server to close the keep-alive connection
- await server.shutdown()
- await server.startup()
+ await restart(server)
response = await http.request("GET", "http://127.0.0.1:8000/")
await response.read()
assert len(http.keepalive_connections) == 1
-@pytest.mark.asyncio
-async def test_connection_closed_free_semaphore_on_acquire(server):
+async def test_connection_closed_free_semaphore_on_acquire(server, restart, backend):
"""
Verify that max_connections semaphore is released
properly on a disconnected connection.
await response.read()
# Close the connection so we're forced to recycle it
- await server.shutdown()
- await server.startup()
+ await restart(server)
response = await http.request("GET", "http://127.0.0.1:8000/")
assert response.status_code == 200
-import pytest
-
from httpx import HTTPConnection
-@pytest.mark.asyncio
-async def test_get(server):
- conn = HTTPConnection(origin="http://127.0.0.1:8000/")
+async def test_get(server, backend):
+ conn = HTTPConnection(origin="http://127.0.0.1:8000/", backend=backend)
response = await conn.request("GET", "http://127.0.0.1:8000/")
await response.read()
assert response.status_code == 200
assert response.content == b"Hello, world!"
-@pytest.mark.asyncio
-async def test_post(server):
- conn = HTTPConnection(origin="http://127.0.0.1:8000/")
+async def test_post(server, backend):
+ conn = HTTPConnection(origin="http://127.0.0.1:8000/", backend=backend)
response = await conn.request(
"GET", "http://127.0.0.1:8000/", data=b"Hello, world!"
)
assert response.status_code == 200
-@pytest.mark.asyncio
-async def test_https_get_with_ssl_defaults(https_server):
+async def test_https_get_with_ssl_defaults(https_server, backend):
"""
An HTTPS request, with default SSL configuration set on the client.
"""
- conn = HTTPConnection(origin="https://127.0.0.1:8001/", verify=False)
+ conn = HTTPConnection(
+ origin="https://127.0.0.1:8001/", verify=False, backend=backend
+ )
response = await conn.request("GET", "https://127.0.0.1:8001/")
await response.read()
assert response.status_code == 200
assert response.content == b"Hello, world!"
-@pytest.mark.asyncio
-async def test_https_get_with_sll_overrides(https_server):
+async def test_https_get_with_sll_overrides(https_server, backend):
"""
An HTTPS request, with SSL configuration set on the request.
"""
- conn = HTTPConnection(origin="https://127.0.0.1:8001/")
+ conn = HTTPConnection(origin="https://127.0.0.1:8001/", backend=backend)
response = await conn.request("GET", "https://127.0.0.1:8001/", verify=False)
await response.read()
assert response.status_code == 200
import json
-import pytest
-
from httpx import AsyncClient, Client, Response
from .utils import MockHTTP2Backend
assert json.loads(response.content) == {"method": "GET", "path": "/", "body": ""}
-@pytest.mark.asyncio
-async def test_async_http2_get_request():
- backend = MockHTTP2Backend(app=app)
+async def test_async_http2_get_request(backend):
+ backend = MockHTTP2Backend(app=app, backend=backend)
async with AsyncClient(backend=backend) as client:
response = await client.get("http://example.org")
}
-@pytest.mark.asyncio
-async def test_async_http2_post_request():
- backend = MockHTTP2Backend(app=app)
+async def test_async_http2_post_request(backend):
+ backend = MockHTTP2Backend(app=app, backend=backend)
async with AsyncClient(backend=backend) as client:
response = await client.post("http://example.org", data=b"<data>")
assert json.loads(response_3.content) == {"method": "GET", "path": "/3", "body": ""}
-@pytest.mark.asyncio
-async def test_async_http2_multiple_requests():
- backend = MockHTTP2Backend(app=app)
+async def test_async_http2_multiple_requests(backend):
+ backend = MockHTTP2Backend(app=app, backend=backend)
async with AsyncClient(backend=backend) as client:
response_1 = await client.get("http://example.org/1")
assert json.loads(response_2.content) == {"method": "GET", "path": "/2", "body": ""}
-@pytest.mark.asyncio
-async def test_async_http2_reconnect():
+async def test_async_http2_reconnect(backend):
"""
If a connection has been dropped between requests, then we should
be seemlessly reconnected.
"""
- backend = MockHTTP2Backend(app=app)
+ backend = MockHTTP2Backend(app=app, backend=backend)
async with AsyncClient(backend=backend) as client:
response_1 = await client.get("http://example.org/1")
import json
-import pytest
-
from httpx import (
AsyncClient,
CertTypes,
assert response.json() == {"hello": "world"}
-@pytest.mark.asyncio
-async def test_async_threaded_dispatch():
+async def test_async_threaded_dispatch(backend):
"""
Use a synchronous 'Dispatcher' class with the async client.
Calls to the dispatcher will end up running within a thread pool.
"""
url = "https://example.org/"
- async with AsyncClient(dispatch=MockDispatch()) as client:
+ async with AsyncClient(dispatch=MockDispatch(), backend=backend) as client:
response = await client.get(url)
assert response.status_code == 200
-import asyncio
import ssl
import typing
import h2.events
from httpx import AsyncioBackend, BaseStream, Request, TimeoutConfig
+from tests.concurrency import sleep
-class MockHTTP2Backend(AsyncioBackend):
- def __init__(self, app):
+class MockHTTP2Backend:
+ def __init__(self, app, backend=None):
self.app = app
+ self.backend = AsyncioBackend() if backend is None else backend
self.server = None
async def connect(
ssl_context: typing.Optional[ssl.SSLContext],
timeout: TimeoutConfig,
) -> BaseStream:
- self.server = MockHTTP2Server(self.app)
+ self.server = MockHTTP2Server(self.app, backend=self.backend)
return self.server
+ # Defer all other attributes and methods to the underlying backend.
+ def __getattr__(self, name: str) -> typing.Any:
+ return getattr(self.backend, name)
+
class MockHTTP2Server(BaseStream):
- def __init__(self, app):
+ def __init__(self, app, backend):
config = h2.config.H2Configuration(client_side=False)
self.conn = h2.connection.H2Connection(config=config)
self.app = app
+ self.backend = backend
self.buffer = b""
self.requests = {}
self.close_connection = False
return "HTTP/2"
async def read(self, n, timeout, flag=None) -> bytes:
- await asyncio.sleep(0)
+ await sleep(self.backend, 0)
send, self.buffer = self.buffer[:n], self.buffer[n:]
return send
assert response.text == "Hello, World!"
-@pytest.mark.asyncio
-async def test_asgi_async():
- client = httpx.AsyncClient(app=hello_world)
+async def test_asgi_async(backend):
+ client = httpx.AsyncClient(app=hello_world, backend=backend)
response = await client.get("http://www.example.org/")
assert response.status_code == 200
assert response.text == "Hello, World!"
assert response.text == "example"
-@pytest.mark.asyncio
-async def test_asgi_upload_async():
- client = httpx.AsyncClient(app=echo_body)
+async def test_asgi_upload_async(backend):
+ client = httpx.AsyncClient(app=echo_body, backend=backend)
response = await client.post("http://www.example.org/", data=b"example")
assert response.status_code == 200
assert response.text == "example"
client.get("http://www.example.org/")
-@pytest.mark.asyncio
-async def test_asgi_exc_async():
- client = httpx.AsyncClient(app=raise_exc)
+async def test_asgi_exc_async(backend):
+ client = httpx.AsyncClient(app=raise_exc, backend=backend)
with pytest.raises(ValueError):
await client.get("http://www.example.org/")
client.get("http://www.example.org/")
-@pytest.mark.asyncio
-async def test_asgi_exc_after_response_async():
- client = httpx.AsyncClient(app=raise_exc_after_response)
+async def test_asgi_exc_after_response_async(backend):
+ client = httpx.AsyncClient(app=raise_exc_after_response, backend=backend)
with pytest.raises(ValueError):
await client.get("http://www.example.org/")
)
-@pytest.mark.asyncio
-async def test_read_timeout(server):
- timeout = TimeoutConfig(read_timeout=0.000001)
+async def test_read_timeout(server, backend):
+ timeout = TimeoutConfig(read_timeout=1e-6)
- async with AsyncClient(timeout=timeout) as client:
+ async with AsyncClient(timeout=timeout, backend=backend) as client:
with pytest.raises(ReadTimeout):
await client.get("http://127.0.0.1:8000/slow_response")
-@pytest.mark.asyncio
-async def test_write_timeout(server):
- timeout = TimeoutConfig(write_timeout=0.000001)
+async def test_write_timeout(server, backend):
+ timeout = TimeoutConfig(write_timeout=1e-6)
- async with AsyncClient(timeout=timeout) as client:
+ async with AsyncClient(timeout=timeout, backend=backend) as client:
with pytest.raises(WriteTimeout):
data = b"*" * 1024 * 1024 * 100
await client.put("http://127.0.0.1:8000/slow_response", data=data)
-@pytest.mark.asyncio
-async def test_connect_timeout(server):
- timeout = TimeoutConfig(connect_timeout=0.000001)
+async def test_connect_timeout(server, backend):
+ timeout = TimeoutConfig(connect_timeout=1e-6)
- async with AsyncClient(timeout=timeout) as client:
+ async with AsyncClient(timeout=timeout, backend=backend) as client:
with pytest.raises(ConnectTimeout):
# See https://stackoverflow.com/questions/100841/
await client.get("http://10.255.255.1/")
-@pytest.mark.asyncio
-async def test_pool_timeout(server):
- pool_limits = PoolLimits(hard_limit=1, pool_timeout=0.000001)
+async def test_pool_timeout(server, backend):
+ pool_limits = PoolLimits(hard_limit=1, pool_timeout=1e-6)
- async with AsyncClient(pool_limits=pool_limits) as client:
+ async with AsyncClient(pool_limits=pool_limits, backend=backend) as client:
response = await client.get("http://127.0.0.1:8000/", stream=True)
with pytest.raises(PoolTimeout):