async def test_get(server, backend):
url = server.url
- async with httpx.Client(backend=backend) as client:
+ async with httpx.Client() as client:
response = await client.get(url)
assert response.status_code == 200
assert response.text == "Hello, world!"
async def test_build_request(server, backend):
url = server.url.copy_with(path="/echo_headers")
headers = {"Custom-header": "value"}
- async with httpx.Client(backend=backend) as client:
+ async with httpx.Client() as client:
request = client.build_request("GET", url)
request.headers.update(headers)
response = await client.send(request)
async def test_post(server, backend):
url = server.url
- async with httpx.Client(backend=backend) as client:
+ async with httpx.Client() as client:
response = await client.post(url, data=b"Hello, world!")
assert response.status_code == 200
async def test_post_json(server, backend):
url = server.url
- async with httpx.Client(backend=backend) as client:
+ async with httpx.Client() as client:
response = await client.post(url, json={"text": "Hello, world!"})
assert response.status_code == 200
async def test_stream_response(server, backend):
- async with httpx.Client(backend=backend) as client:
+ async with httpx.Client() as client:
async with client.stream("GET", server.url) as response:
body = await response.read()
async def test_access_content_stream_response(server, backend):
- async with httpx.Client(backend=backend) as client:
+ async with httpx.Client() as client:
async with client.stream("GET", server.url) as response:
pass
yield b"Hello, "
yield b"world!"
- async with httpx.Client(backend=backend) as client:
+ async with httpx.Client() as client:
response = await client.request("POST", server.url, data=hello_world())
assert response.status_code == 200
async def test_raise_for_status(server, backend):
- async with httpx.Client(backend=backend) as client:
+ async with httpx.Client() as client:
for status_code in (200, 400, 404, 500, 505):
response = await client.request(
"GET", server.url.copy_with(path=f"/status/{status_code}")
async def test_options(server, backend):
- async with httpx.Client(backend=backend) as client:
+ async with httpx.Client() as client:
response = await client.options(server.url)
assert response.status_code == 200
assert response.text == "Hello, world!"
async def test_head(server, backend):
- async with httpx.Client(backend=backend) as client:
+ async with httpx.Client() as client:
response = await client.head(server.url)
assert response.status_code == 200
assert response.text == ""
async def test_put(server, backend):
- async with httpx.Client(backend=backend) as client:
+ async with httpx.Client() as client:
response = await client.put(server.url, data=b"Hello, world!")
assert response.status_code == 200
async def test_patch(server, backend):
- async with httpx.Client(backend=backend) as client:
+ async with httpx.Client() as client:
response = await client.patch(server.url, data=b"Hello, world!")
assert response.status_code == 200
async def test_delete(server, backend):
- async with httpx.Client(backend=backend) as client:
+ async with httpx.Client() as client:
response = await client.delete(server.url)
assert response.status_code == 200
assert response.text == "Hello, world!"
headers = {"Expect": "100-continue"}
data = b"Echo request body"
- async with httpx.Client(backend=backend) as client:
+ async with httpx.Client() as client:
response = await client.post(
server.url.copy_with(path="/echo_body"), headers=headers, data=data
)
url = uds_server.url
uds = uds_server.config.uds
assert uds is not None
- async with httpx.Client(backend=backend, uds=uds) as client:
+ async with httpx.Client(uds=uds) as client:
response = await client.get(url)
assert response.status_code == 200
assert response.text == "Hello, world!"
assert response.encoding == "iso-8859-1"
+
+
+@pytest.mark.parametrize(
+ "backend",
+ [
+ pytest.param("asyncio", marks=pytest.mark.asyncio),
+ pytest.param("trio", marks=pytest.mark.trio),
+ ],
+)
+async def test_explicit_backend(server, backend):
+ async with httpx.Client(backend=backend) as client:
+ response = await client.get(server.url)
+ assert response.status_code == 200
+ assert response.text == "Hello, world!"
async def test_no_redirect(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
url = "https://example.com/no_redirect"
response = await client.get(url)
assert response.status_code == 200
async def test_redirect_301(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
response = await client.post("https://example.org/redirect_301")
assert response.status_code == codes.OK
assert response.url == URL("https://example.org/")
async def test_redirect_302(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
response = await client.post("https://example.org/redirect_302")
assert response.status_code == codes.OK
assert response.url == URL("https://example.org/")
async def test_redirect_303(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
response = await client.get("https://example.org/redirect_303")
assert response.status_code == codes.OK
assert response.url == URL("https://example.org/")
async def test_disallow_redirects(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
response = await client.post(
"https://example.org/redirect_303", allow_redirects=False
)
async def test_relative_redirect(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
response = await client.get("https://example.org/relative_redirect")
assert response.status_code == codes.OK
assert response.url == URL("https://example.org/")
async def test_no_scheme_redirect(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
response = await client.get("https://example.org/no_scheme_redirect")
assert response.status_code == codes.OK
assert response.url == URL("https://example.org/")
async def test_fragment_redirect(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
response = await client.get("https://example.org/relative_redirect#fragment")
assert response.status_code == codes.OK
assert response.url == URL("https://example.org/#fragment")
async def test_multiple_redirects(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
response = await client.get("https://example.org/multiple_redirects?count=20")
assert response.status_code == codes.OK
assert response.url == URL("https://example.org/multiple_redirects")
async def test_too_many_redirects(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
with pytest.raises(TooManyRedirects):
await client.get("https://example.org/multiple_redirects?count=21")
async def test_too_many_redirects_calling_next(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
url = "https://example.org/multiple_redirects?count=21"
response = await client.get(url, allow_redirects=False)
with pytest.raises(TooManyRedirects):
async def test_redirect_loop(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
with pytest.raises(RedirectLoop):
await client.get("https://example.org/redirect_loop")
async def test_redirect_loop_calling_next(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
url = "https://example.org/redirect_loop"
response = await client.get(url, allow_redirects=False)
with pytest.raises(RedirectLoop):
async def test_cross_domain_redirect(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
url = "https://example.com/cross_domain"
headers = {"Authorization": "abc"}
response = await client.get(url, headers=headers)
async def test_same_domain_redirect(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
url = "https://example.org/cross_domain"
headers = {"Authorization": "abc"}
response = await client.get(url, headers=headers)
"""
A 308 redirect should preserve the request body.
"""
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
url = "https://example.org/redirect_body"
data = b"Example request body"
response = await client.post(url, data=data)
"""
A 303 redirect should remove the request body.
"""
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
url = "https://example.org/redirect_no_body"
data = b"Example request body"
response = await client.post(url, data=data)
async def test_cannot_redirect_streaming_body(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
url = "https://example.org/redirect_body"
async def streaming_body():
async def test_cross_subdomain_redirect(backend):
- client = Client(dispatch=MockDispatch(), backend=backend)
+ client = Client(dispatch=MockDispatch())
url = "https://example.com/cross_subdomain"
response = await client.get(url)
assert response.url == URL("https://www.example.org/cross_subdomain")
async def test_redirect_cookie_behavior(backend):
- client = Client(dispatch=MockCookieDispatch(), backend=backend)
+ client = Client(dispatch=MockCookieDispatch())
# The client is not logged in.
response = await client.get("https://example.com/")
import trio
from httpx.concurrency.asyncio import AsyncioBackend
+from httpx.concurrency.auto import AutoBackend
from httpx.concurrency.trio import TrioBackend
raise NotImplementedError # pragma: no cover
+@sleep.register(AutoBackend)
+async def _sleep_auto(backend, seconds: int):
+ return await sleep(backend.backend, seconds=seconds)
+
+
@sleep.register(AsyncioBackend)
async def _sleep_asyncio(backend, seconds: int):
await asyncio.sleep(seconds)
raise NotImplementedError # pragma: no cover
+@run_concurrently.register(AutoBackend)
+async def _run_concurrently_auto(backend, *coroutines):
+ await run_concurrently(backend.backend, *coroutines)
+
+
@run_concurrently.register(AsyncioBackend)
async def _run_concurrently_asyncio(backend, *coroutines):
coros = (coroutine() for coroutine in coroutines)
async with trio.open_nursery() as nursery:
for coroutine in coroutines:
nursery.start_soon(coroutine)
+
+
+@functools.singledispatch
+def get_cipher(backend, stream):
+ raise NotImplementedError # pragma: no cover
+
+
+@get_cipher.register(AutoBackend)
+def _get_cipher_auto(backend, stream):
+ return get_cipher(backend.backend, stream)
+
+
+@get_cipher.register(AsyncioBackend)
+def _get_cipher_asyncio(backend, stream):
+ return stream.stream_writer.get_extra_info("cipher", default=None)
+
+
+@get_cipher.register(TrioBackend)
+def get_trio_cipher(backend, stream):
+ return stream.stream.cipher() if isinstance(stream.stream, trio.SSLStream) else None
from httpx import URL
from httpx.concurrency.asyncio import AsyncioBackend
-from httpx.concurrency.trio import TrioBackend
+from httpx.concurrency.base import lookup_backend
ENVIRONMENT_VARIABLES = {
"SSL_CERT_FILE",
@pytest.fixture(
params=[
- pytest.param(AsyncioBackend, marks=pytest.mark.asyncio),
- pytest.param(TrioBackend, marks=pytest.mark.trio),
+ # pytest uses the marks to set up the specified async environment and run
+ # 'async def' test functions. The "auto" backend should then auto-detect
+ # the environment it's running in.
+ # Passing the backend explicitly, e.g. `backend="asyncio"`,
+ # is tested separately.
+ pytest.param("auto", marks=pytest.mark.asyncio),
+ pytest.param("auto", marks=pytest.mark.trio),
]
)
def backend(request):
- backend_cls = request.param
- return backend_cls()
+ return request.param
async def app(scope, receive, send):
This fixture deals with possible differences between the environment of the
test function and that of the server.
"""
+ asyncio_backend = AsyncioBackend()
+ backend_implementation = lookup_backend(backend)
async def restart(server):
- await backend.run_in_threadpool(AsyncioBackend().run, server.restart)
+ await backend_implementation.run_in_threadpool(
+ asyncio_backend.run, server.restart
+ )
return restart
"""
Connections should default to staying in a keep-alive state.
"""
- async with ConnectionPool(backend=backend) as http:
+ async with ConnectionPool() as http:
response = await http.request("GET", server.url)
await response.read()
assert len(http.active_connections) == 0
"""
Connections to differing connection keys should result in multiple connections.
"""
- async with ConnectionPool(backend=backend) as http:
+ async with ConnectionPool() as http:
response = await http.request("GET", server.url)
await response.read()
assert len(http.active_connections) == 0
"""
pool_limits = httpx.PoolLimits(soft_limit=1)
- async with ConnectionPool(pool_limits=pool_limits, backend=backend) as http:
+ async with ConnectionPool(pool_limits=pool_limits) as http:
response = await http.request("GET", server.url)
await response.read()
assert len(http.active_connections) == 0
"""
A streaming request should hold the connection open until the response is read.
"""
- async with ConnectionPool(backend=backend) as http:
+ async with ConnectionPool() as http:
response = await http.request("GET", server.url)
assert len(http.active_connections) == 1
assert len(http.keepalive_connections) == 0
"""
Multiple conncurrent requests should open multiple conncurrent connections.
"""
- async with ConnectionPool(backend=backend) as http:
+ async with ConnectionPool() as http:
response_a = await http.request("GET", server.url)
assert len(http.active_connections) == 1
assert len(http.keepalive_connections) == 0
Using a `Connection: close` header should close the connection.
"""
headers = [(b"connection", b"close")]
- async with ConnectionPool(backend=backend) as http:
+ async with ConnectionPool() as http:
response = await http.request("GET", server.url, headers=headers)
await response.read()
assert len(http.active_connections) == 0
"""
A standard close should keep the connection open.
"""
- async with ConnectionPool(backend=backend) as http:
+ async with ConnectionPool() as http:
response = await http.request("GET", server.url)
await response.read()
await response.close()
"""
A premature close should close the connection.
"""
- async with ConnectionPool(backend=backend) as http:
+ async with ConnectionPool() as http:
response = await http.request("GET", server.url)
await response.close()
assert len(http.active_connections) == 0
Upon keep-alive connection closed by remote a new connection
should be reestablished.
"""
- async with ConnectionPool(backend=backend) as http:
+ async with ConnectionPool() as http:
response = await http.request("GET", server.url)
await response.read()
Upon keep-alive connection closed by remote a new connection
should be reestablished.
"""
- async with ConnectionPool(backend=backend) as http:
+ async with ConnectionPool() as http:
response = await http.request("GET", server.url)
await response.read()
Verify that max_connections semaphore is released
properly on a disconnected connection.
"""
- async with ConnectionPool(
- pool_limits=httpx.PoolLimits(hard_limit=1), backend=backend
- ) as http:
+ async with ConnectionPool(pool_limits=httpx.PoolLimits(hard_limit=1)) as http:
response = await http.request("GET", server.url)
await response.read()
async def test_get(server, backend):
- async with HTTPConnection(origin=server.url, backend=backend) as conn:
+ async with HTTPConnection(origin=server.url) as conn:
response = await conn.request("GET", server.url)
await response.read()
assert response.status_code == 200
async def test_post(server, backend):
- async with HTTPConnection(origin=server.url, backend=backend) as conn:
+ async with HTTPConnection(origin=server.url) as conn:
response = await conn.request("GET", server.url, data=b"Hello, world!")
assert response.status_code == 200
async def test_premature_close(server, backend):
with pytest.raises(httpx.ConnectionClosed):
- async with HTTPConnection(origin=server.url, backend=backend) as conn:
+ async with HTTPConnection(origin=server.url) as conn:
response = await conn.request(
"GET", server.url.copy_with(path="/premature_close")
)
"""
An HTTPS request, with default SSL configuration set on the client.
"""
- async with HTTPConnection(
- origin=https_server.url, verify=ca_cert_pem_file, backend=backend
- ) as conn:
+ async with HTTPConnection(origin=https_server.url, verify=ca_cert_pem_file) as conn:
response = await conn.request("GET", https_server.url)
await response.read()
assert response.status_code == 200
"""
An HTTPS request, with SSL configuration set on the request.
"""
- async with HTTPConnection(origin=https_server.url, backend=backend) as conn:
+ async with HTTPConnection(origin=https_server.url) as conn:
response = await conn.request("GET", https_server.url, verify=ca_cert_pem_file)
await response.read()
assert response.status_code == 200
async def test_http2_settings_in_handshake(backend):
- backend = MockHTTP2Backend(app=app, backend=backend)
+ backend = MockHTTP2Backend(app=app)
async with Client(backend=backend, http2=True) as client:
await client.get("http://example.org")
async def test_http2_live_request(backend):
- async with Client(backend=backend, http2=True) as client:
+ async with Client(http2=True) as client:
try:
resp = await client.get("https://nghttp2.org/httpbin/anything")
except TimeoutException:
b"\r\n",
]
),
- backend=backend,
)
async with httpx.HTTPProxy(
proxy_url="http://127.0.0.1:8000", backend=raw_io, proxy_mode="TUNNEL_ONLY",
b"\r\n",
]
),
- backend=backend,
)
with pytest.raises(httpx.ProxyError) as e:
b"\r\n",
]
),
- backend=backend,
)
async with httpx.HTTPProxy(
proxy_url="http://127.0.0.1:8000", backend=raw_io, proxy_mode="TUNNEL_ONLY",
b"\r\n"
]
),
- backend=backend,
)
async with httpx.HTTPProxy(
proxy_url="http://127.0.0.1:8000",
import h2.events
from httpx import Request, Timeout
-from httpx.concurrency.asyncio import AsyncioBackend
-from httpx.concurrency.base import BaseSocketStream
+from httpx.concurrency.base import BaseSocketStream, lookup_backend
from tests.concurrency import sleep
class MockHTTP2Backend:
- def __init__(self, app, backend=None):
+ def __init__(self, app, backend="auto"):
self.app = app
- self.backend = AsyncioBackend() if backend is None else backend
+ self.backend = lookup_backend(backend)
self.server = None
async def open_tcp_stream(
class MockRawSocketBackend:
- def __init__(self, data_to_send=b"", backend=None):
- self.backend = AsyncioBackend() if backend is None else backend
+ def __init__(self, data_to_send=b"", backend="auto"):
+ self.backend = lookup_backend(backend)
self.data_to_send = data_to_send
self.received_data = []
self.stream = MockRawSocketStream(self)
+import asyncio
+
import pytest
import trio
from httpx import Timeout
from httpx.concurrency.asyncio import AsyncioBackend
+from httpx.concurrency.base import lookup_backend
from httpx.concurrency.trio import TrioBackend
from httpx.config import SSLConfig
-from tests.concurrency import run_concurrently, sleep
-
-
-def get_asyncio_cipher(stream):
- return stream.stream_writer.get_extra_info("cipher", default=None)
-
-
-def get_trio_cipher(stream):
- return stream.stream.cipher() if isinstance(stream.stream, trio.SSLStream) else None
+from tests.concurrency import get_cipher, run_concurrently, sleep
async def read_response(stream, timeout: Timeout, should_contain: bytes) -> bytes:
return response
-@pytest.mark.parametrize(
- "backend, get_cipher",
- [
- pytest.param(AsyncioBackend(), get_asyncio_cipher, marks=pytest.mark.asyncio),
- pytest.param(TrioBackend(), get_trio_cipher, marks=pytest.mark.trio),
- ],
-)
-async def test_start_tls_on_tcp_socket_stream(https_server, backend, get_cipher):
+async def test_start_tls_on_tcp_socket_stream(https_server, backend):
+ backend = lookup_backend(backend)
ctx = SSLConfig().load_ssl_context_no_verify()
timeout = Timeout(5)
try:
assert stream.is_connection_dropped() is False
- assert get_cipher(stream) is None
+ assert get_cipher(backend, stream) is None
stream = await stream.start_tls(https_server.url.host, ctx, timeout)
assert stream.is_connection_dropped() is False
- assert get_cipher(stream) is not None
+ assert get_cipher(backend, stream) is not None
await stream.write(b"GET / HTTP/1.1\r\n\r\n", timeout)
await stream.close()
-@pytest.mark.parametrize(
- "backend, get_cipher",
- [
- pytest.param(AsyncioBackend(), get_asyncio_cipher, marks=pytest.mark.asyncio),
- pytest.param(TrioBackend(), get_trio_cipher, marks=pytest.mark.trio),
- ],
-)
-async def test_start_tls_on_uds_socket_stream(https_uds_server, backend, get_cipher):
+async def test_start_tls_on_uds_socket_stream(https_uds_server, backend):
+ backend = lookup_backend(backend)
ctx = SSLConfig().load_ssl_context_no_verify()
timeout = Timeout(5)
try:
assert stream.is_connection_dropped() is False
- assert get_cipher(stream) is None
+ assert get_cipher(backend, stream) is None
stream = await stream.start_tls(https_uds_server.url.host, ctx, timeout)
assert stream.is_connection_dropped() is False
- assert get_cipher(stream) is not None
+ assert get_cipher(backend, stream) is not None
await stream.write(b"GET / HTTP/1.1\r\n\r\n", timeout)
"""
Regression test for: https://github.com/encode/httpx/issues/527
"""
+ backend = lookup_backend(backend)
stream = await backend.open_tcp_stream(
server.url.host, server.url.port, ssl_context=None, timeout=Timeout(5)
)
async def test_fork(backend):
+ backend = lookup_backend(backend)
ok_counter = 0
async def ok(delay: int) -> None:
# No 'match', since we can't know which will be raised first.
with pytest.raises(RuntimeError):
await backend.fork(fail, ["My bad", 0], fail, ["Oops", 0])
+
+
+def test_lookup_backend():
+ assert isinstance(lookup_backend("asyncio"), AsyncioBackend)
+ assert isinstance(lookup_backend("trio"), TrioBackend)
+ assert isinstance(lookup_backend(AsyncioBackend()), AsyncioBackend)
+
+ async def get_backend_from_auto():
+ auto_backend = lookup_backend("auto")
+ return auto_backend.backend
+
+ loop = asyncio.get_event_loop()
+ backend = loop.run_until_complete(get_backend_from_auto())
+ assert isinstance(backend, AsyncioBackend)
+
+ backend = trio.run(get_backend_from_auto)
+ assert isinstance(backend, TrioBackend)
+
+ with pytest.raises(Exception, match="unknownio"):
+ lookup_backend("unknownio")
async def test_read_timeout(server, backend):
timeout = httpx.Timeout(read_timeout=1e-6)
- async with httpx.Client(timeout=timeout, backend=backend) as client:
+ async with httpx.Client(timeout=timeout) as client:
with pytest.raises(httpx.ReadTimeout):
await client.get(server.url.copy_with(path="/slow_response"))
async def test_write_timeout(server, backend):
timeout = httpx.Timeout(write_timeout=1e-6)
- async with httpx.Client(timeout=timeout, backend=backend) as client:
+ async with httpx.Client(timeout=timeout) as client:
with pytest.raises(httpx.WriteTimeout):
data = b"*" * 1024 * 1024 * 100
await client.put(server.url.copy_with(path="/slow_response"), data=data)
async def test_connect_timeout(server, backend):
timeout = httpx.Timeout(connect_timeout=1e-6)
- async with httpx.Client(timeout=timeout, backend=backend) as client:
+ async with httpx.Client(timeout=timeout) as client:
with pytest.raises(httpx.ConnectTimeout):
# See https://stackoverflow.com/questions/100841/
await client.get("http://10.255.255.1/")
pool_limits = httpx.PoolLimits(hard_limit=1)
timeout = httpx.Timeout(pool_timeout=1e-4)
- async with httpx.Client(
- pool_limits=pool_limits, timeout=timeout, backend=backend
- ) as client:
+ async with httpx.Client(pool_limits=pool_limits, timeout=timeout) as client:
async with client.stream("GET", server.url):
with pytest.raises(httpx.PoolTimeout):
await client.get("http://localhost:8000/")