```python
proxy = httpx.HTTPProxy(
proxy_url="https://127.0.0.1",
- proxy_mode=httpx.HTTPProxyMode.TUNNEL_ONLY
+ proxy_mode="TUNNEL_ONLY" # May be "TUNNEL_ONLY" or "FORWARD_ONLY". Defaults to "DEFAULT".
)
async with httpx.Client(proxies=proxy) as client:
# This request will be tunneled instead of forwarded.
from .api import delete, get, head, options, patch, post, put, request, stream
from .auth import BasicAuth, DigestAuth
from .client import Client
-from .concurrency.asyncio import AsyncioBackend
-from .concurrency.base import BasePoolSemaphore, BaseSocketStream, ConcurrencyBackend
-from .config import (
- USER_AGENT,
- CertTypes,
- PoolLimits,
- SSLConfig,
- Timeout,
- TimeoutConfig,
- TimeoutTypes,
- VerifyTypes,
-)
-from .dispatch.base import Dispatcher
-from .dispatch.connection import HTTPConnection
-from .dispatch.connection_pool import ConnectionPool
+from .config import TimeoutConfig # For 0.8 backwards compat.
+from .config import PoolLimits, Timeout
from .dispatch.proxy_http import HTTPProxy, HTTPProxyMode
from .exceptions import (
+ ConnectionClosed,
ConnectTimeout,
CookieConflict,
DecodingError,
TooManyRedirects,
WriteTimeout,
)
-from .models import (
- URL,
- AuthTypes,
- Cookies,
- CookieTypes,
- Headers,
- HeaderTypes,
- Origin,
- QueryParams,
- QueryParamTypes,
- Request,
- RequestData,
- RequestFiles,
- Response,
- ResponseContent,
- URLTypes,
-)
+from .models import URL, Cookies, Headers, Origin, QueryParams, Request, Response
from .status_codes import StatusCode, codes
__all__ = [
"put",
"request",
"stream",
+ "codes",
"BasicAuth",
"Client",
"DigestAuth",
- "AsyncioBackend",
- "USER_AGENT",
- "CertTypes",
"PoolLimits",
- "SSLConfig",
"Timeout",
- "TimeoutConfig",
- "VerifyTypes",
- "HTTPConnection",
- "BasePoolSemaphore",
- "ConnectionPool",
+ "TimeoutConfig", # For 0.8 backwards compat.
"HTTPProxy",
- "HTTPProxyMode",
+ "HTTPProxyMode", # For 0.8 backwards compat.
"ConnectTimeout",
"CookieConflict",
+ "ConnectionClosed",
"DecodingError",
"HTTPError",
"InvalidURL",
"WriteTimeout",
"BaseSocketStream",
"ConcurrencyBackend",
- "Dispatcher",
"URL",
- "URLTypes",
"StatusCode",
- "codes",
- "TimeoutTypes",
- "AuthTypes",
"Cookies",
- "CookieTypes",
"Headers",
- "HeaderTypes",
"Origin",
"QueryParams",
- "QueryParamTypes",
"Request",
- "RequestData",
"TimeoutException",
"Response",
- "ResponseContent",
- "RequestFiles",
"DigestAuth",
]
import enum
import typing
+import warnings
from base64 import b64encode
from ..concurrency.base import ConcurrencyBackend
class HTTPProxyMode(enum.Enum):
+ # This enum is pending deprecation in order to reduce API surface area,
+ # but is currently still around for 0.8 backwards compat.
DEFAULT = "DEFAULT"
FORWARD_ONLY = "FORWARD_ONLY"
TUNNEL_ONLY = "TUNNEL_ONLY"
+DEFAULT_MODE = "DEFAULT"
+FORWARD_ONLY = "FORWARD_ONLY"
+TUNNEL_ONLY = "TUNNEL_ONLY"
+
+
class HTTPProxy(ConnectionPool):
"""A proxy that sends requests to the recipient server
on behalf of the connecting client.
proxy_url: URLTypes,
*,
proxy_headers: HeaderTypes = None,
- proxy_mode: HTTPProxyMode = HTTPProxyMode.DEFAULT,
+ proxy_mode: str = "DEFAULT",
verify: VerifyTypes = True,
cert: CertTypes = None,
trust_env: bool = None,
backend: typing.Union[str, ConcurrencyBackend] = "auto",
):
+ if isinstance(proxy_mode, HTTPProxyMode):
+ warnings.warn(
+ "The 'HTTPProxyMode' enum is pending deprecation. "
+ "Use a plain string instead. proxy_mode='FORWARD_ONLY', or "
+ "proxy_mode='TUNNEL_ONLY'."
+ )
+ proxy_mode = proxy_mode.value
+ assert proxy_mode in ("DEFAULT", "FORWARD_ONLY", "TUNNEL_ONLY")
+
super(HTTPProxy, self).__init__(
verify=verify,
cert=cert,
tunnel all 'HTTPS' requests.
"""
return (
- self.proxy_mode == HTTPProxyMode.DEFAULT and not origin.is_ssl
- ) or self.proxy_mode == HTTPProxyMode.FORWARD_ONLY
+ self.proxy_mode == DEFAULT_MODE and not origin.is_ssl
+ ) or self.proxy_mode == FORWARD_ONLY
async def send(
self,
import pytest
-from httpx import (
- URL,
- CertTypes,
- Client,
- DigestAuth,
- Dispatcher,
- ProtocolError,
- Request,
- Response,
- TimeoutTypes,
- VerifyTypes,
-)
+from httpx import URL, Client, DigestAuth, ProtocolError, Request, Response
+from httpx.config import CertTypes, TimeoutTypes, VerifyTypes
+from httpx.dispatch.base import Dispatcher
class MockDispatch(Dispatcher):
import pytest
-from httpx import (
- CertTypes,
- Client,
- Cookies,
- Dispatcher,
- Request,
- Response,
- TimeoutTypes,
- VerifyTypes,
-)
+from httpx import Client, Cookies, Request, Response
+from httpx.config import CertTypes, TimeoutTypes, VerifyTypes
+from httpx.dispatch.base import Dispatcher
class MockDispatch(Dispatcher):
import pytest
-from httpx import (
- CertTypes,
- Client,
- Dispatcher,
- Request,
- Response,
- TimeoutTypes,
- VerifyTypes,
- __version__,
- models,
-)
+from httpx import Client, Headers, Request, Response, __version__
+from httpx.config import CertTypes, TimeoutTypes, VerifyTypes
+from httpx.dispatch.base import Dispatcher
class MockDispatch(Dispatcher):
def test_header_does_not_exist():
- headers = models.Headers({"foo": "bar"})
+ headers = Headers({"foo": "bar"})
with pytest.raises(KeyError):
del headers["baz"]
pool = client.dispatch
proxy = client.proxies["all"]
- assert isinstance(pool, httpx.ConnectionPool)
assert isinstance(proxy, httpx.HTTPProxy)
for prop in [
dispatcher = client.dispatcher_for_url(httpx.URL(url))
if expected is None:
- assert isinstance(dispatcher, httpx.ConnectionPool)
assert dispatcher is client.dispatch
else:
assert isinstance(dispatcher, httpx.HTTPProxy)
import pytest
-from httpx import (
- CertTypes,
- Client,
- Dispatcher,
- QueryParams,
- Request,
- Response,
- TimeoutTypes,
- VerifyTypes,
-)
-from httpx.models import URL
+from httpx import URL, Client, QueryParams, Request, Response
+from httpx.config import CertTypes, TimeoutTypes, VerifyTypes
+from httpx.dispatch.base import Dispatcher
class MockDispatch(Dispatcher):
from httpx import (
URL,
- CertTypes,
Client,
- Dispatcher,
NotRedirectResponse,
RedirectBodyUnavailable,
RedirectLoop,
Request,
Response,
- TimeoutTypes,
TooManyRedirects,
- VerifyTypes,
codes,
)
+from httpx.config import CertTypes, TimeoutTypes, VerifyTypes
+from httpx.dispatch.base import Dispatcher
class MockDispatch(Dispatcher):
import trio
-from httpx import AsyncioBackend
+from httpx.concurrency.asyncio import AsyncioBackend
from httpx.concurrency.trio import TrioBackend
from uvicorn.config import Config
from uvicorn.main import Server
-from httpx import URL, AsyncioBackend
+from httpx import URL
+from httpx.concurrency.asyncio import AsyncioBackend
from httpx.concurrency.trio import TrioBackend
ENVIRONMENT_VARIABLES = {
import httpx
+from httpx.dispatch.connection_pool import ConnectionPool
async def test_keepalive_connections(server, backend):
"""
Connections should default to staying in a keep-alive state.
"""
- async with httpx.ConnectionPool(backend=backend) as http:
+ async with ConnectionPool(backend=backend) as http:
response = await http.request("GET", server.url)
await response.read()
assert len(http.active_connections) == 0
"""
Connections to differing connection keys should result in multiple connections.
"""
- async with httpx.ConnectionPool(backend=backend) as http:
+ async with ConnectionPool(backend=backend) as http:
response = await http.request("GET", server.url)
await response.read()
assert len(http.active_connections) == 0
"""
pool_limits = httpx.PoolLimits(soft_limit=1)
- async with httpx.ConnectionPool(pool_limits=pool_limits, backend=backend) as http:
+ async with ConnectionPool(pool_limits=pool_limits, backend=backend) as http:
response = await http.request("GET", server.url)
await response.read()
assert len(http.active_connections) == 0
"""
A streaming request should hold the connection open until the response is read.
"""
- async with httpx.ConnectionPool(backend=backend) as http:
+ async with ConnectionPool(backend=backend) as http:
response = await http.request("GET", server.url)
assert len(http.active_connections) == 1
assert len(http.keepalive_connections) == 0
"""
Multiple conncurrent requests should open multiple conncurrent connections.
"""
- async with httpx.ConnectionPool(backend=backend) as http:
+ async with ConnectionPool(backend=backend) as http:
response_a = await http.request("GET", server.url)
assert len(http.active_connections) == 1
assert len(http.keepalive_connections) == 0
Using a `Connection: close` header should close the connection.
"""
headers = [(b"connection", b"close")]
- async with httpx.ConnectionPool(backend=backend) as http:
+ async with ConnectionPool(backend=backend) as http:
response = await http.request("GET", server.url, headers=headers)
await response.read()
assert len(http.active_connections) == 0
"""
A standard close should keep the connection open.
"""
- async with httpx.ConnectionPool(backend=backend) as http:
+ async with ConnectionPool(backend=backend) as http:
response = await http.request("GET", server.url)
await response.read()
await response.close()
"""
A premature close should close the connection.
"""
- async with httpx.ConnectionPool(backend=backend) as http:
+ async with ConnectionPool(backend=backend) as http:
response = await http.request("GET", server.url)
await response.close()
assert len(http.active_connections) == 0
Upon keep-alive connection closed by remote a new connection
should be reestablished.
"""
- async with httpx.ConnectionPool(backend=backend) as http:
+ async with ConnectionPool(backend=backend) as http:
response = await http.request("GET", server.url)
await response.read()
Upon keep-alive connection closed by remote a new connection
should be reestablished.
"""
- async with httpx.ConnectionPool(backend=backend) as http:
+ async with ConnectionPool(backend=backend) as http:
response = await http.request("GET", server.url)
await response.read()
Verify that max_connections semaphore is released
properly on a disconnected connection.
"""
- async with httpx.ConnectionPool(
+ async with ConnectionPool(
pool_limits=httpx.PoolLimits(hard_limit=1), backend=backend
) as http:
response = await http.request("GET", server.url)
import pytest
-from httpx import HTTPConnection, exceptions
+import httpx
+from httpx.dispatch.connection import HTTPConnection
async def test_get(server, backend):
async def test_premature_close(server, backend):
- with pytest.raises(exceptions.ConnectionClosed):
+ with pytest.raises(httpx.ConnectionClosed):
async with HTTPConnection(origin=server.url, backend=backend) as conn:
response = await conn.request(
"GET", server.url.copy_with(path="/premature_close")
backend=backend,
)
async with httpx.HTTPProxy(
- proxy_url="http://127.0.0.1:8000",
- backend=raw_io,
- proxy_mode=httpx.HTTPProxyMode.TUNNEL_ONLY,
+ proxy_url="http://127.0.0.1:8000", backend=raw_io, proxy_mode="TUNNEL_ONLY",
) as proxy:
response = await proxy.request("GET", "http://example.com")
with pytest.raises(httpx.ProxyError) as e:
async with httpx.HTTPProxy(
- proxy_url="http://127.0.0.1:8000",
- backend=raw_io,
- proxy_mode=httpx.HTTPProxyMode.TUNNEL_ONLY,
+ proxy_url="http://127.0.0.1:8000", backend=raw_io, proxy_mode="TUNNEL_ONLY",
) as proxy:
await proxy.request("GET", "http://example.com")
backend=backend,
)
async with httpx.HTTPProxy(
- proxy_url="http://127.0.0.1:8000",
- backend=raw_io,
- proxy_mode=httpx.HTTPProxyMode.TUNNEL_ONLY,
+ proxy_url="http://127.0.0.1:8000", backend=raw_io, proxy_mode="TUNNEL_ONLY",
) as proxy:
resp = await proxy.request("GET", "https://example.com")
assert recv[4].startswith(b"GET /target HTTP/1.1\r\nhost: example.com\r\n")
-@pytest.mark.parametrize(
- "proxy_mode", [httpx.HTTPProxyMode.FORWARD_ONLY, httpx.HTTPProxyMode.DEFAULT]
-)
+@pytest.mark.parametrize("proxy_mode", ["FORWARD_ONLY", "DEFAULT"])
async def test_proxy_forwarding(backend, proxy_mode):
raw_io = MockRawSocketBackend(
data_to_send=(
proxy = httpx.HTTPProxy(
"http://127.0.0.1:1080",
proxy_headers={"Custom": "Header"},
- proxy_mode=httpx.HTTPProxyMode.DEFAULT,
+ proxy_mode="DEFAULT",
)
assert repr(proxy) == (
"HTTPProxy(proxy_url=URL('http://127.0.0.1:1080') "
"proxy_headers=Headers({'custom': 'Header'}) "
- "proxy_mode=<HTTPProxyMode.DEFAULT: 'DEFAULT'>)"
+ "proxy_mode='DEFAULT')"
)
import h2.connection
import h2.events
-from httpx import AsyncioBackend, BaseSocketStream, Request, Timeout
+from httpx import Request, Timeout
+from httpx.concurrency.asyncio import AsyncioBackend
+from httpx.concurrency.base import BaseSocketStream
from tests.concurrency import sleep
import pytest
-from httpx import URL, Origin
-from httpx.exceptions import InvalidURL
+from httpx import URL, InvalidURL, Origin
@pytest.mark.parametrize(
import pytest
import trio
-from httpx import AsyncioBackend, SSLConfig, Timeout
+from httpx import Timeout
+from httpx.concurrency.asyncio import AsyncioBackend
from httpx.concurrency.trio import TrioBackend
+from httpx.config import SSLConfig
from tests.concurrency import run_concurrently, sleep
import pytest
import httpx
+from httpx.config import SSLConfig
def test_load_ssl_config():
- ssl_config = httpx.SSLConfig()
+ ssl_config = SSLConfig()
context = ssl_config.load_ssl_context()
assert context.verify_mode == ssl.VerifyMode.CERT_REQUIRED
assert context.check_hostname is True
def test_load_ssl_config_verify_non_existing_path():
- ssl_config = httpx.SSLConfig(verify="/path/to/nowhere")
+ ssl_config = SSLConfig(verify="/path/to/nowhere")
with pytest.raises(IOError):
ssl_config.load_ssl_context()
def test_load_ssl_config_verify_existing_file():
- ssl_config = httpx.SSLConfig(verify=httpx.config.DEFAULT_CA_BUNDLE_PATH)
+ ssl_config = SSLConfig(verify=httpx.config.DEFAULT_CA_BUNDLE_PATH)
context = ssl_config.load_ssl_context()
assert context.verify_mode == ssl.VerifyMode.CERT_REQUIRED
assert context.check_hostname is True
if config.endswith("_FILE")
else str(Path(ca_cert_pem_file).parent)
)
- ssl_config = httpx.SSLConfig(trust_env=True)
+ ssl_config = SSLConfig(trust_env=True)
context = ssl_config.load_ssl_context()
assert context.verify_mode == ssl.VerifyMode.CERT_REQUIRED
assert context.check_hostname is True
def test_load_ssl_config_verify_directory():
path = httpx.config.DEFAULT_CA_BUNDLE_PATH.parent
- ssl_config = httpx.SSLConfig(verify=path)
+ ssl_config = SSLConfig(verify=path)
context = ssl_config.load_ssl_context()
assert context.verify_mode == ssl.VerifyMode.CERT_REQUIRED
assert context.check_hostname is True
def test_load_ssl_config_cert_and_key(cert_pem_file, cert_private_key_file):
- ssl_config = httpx.SSLConfig(cert=(cert_pem_file, cert_private_key_file))
+ ssl_config = SSLConfig(cert=(cert_pem_file, cert_private_key_file))
context = ssl_config.load_ssl_context()
assert context.verify_mode == ssl.VerifyMode.CERT_REQUIRED
assert context.check_hostname is True
def test_load_ssl_config_cert_and_encrypted_key(
cert_pem_file, cert_encrypted_private_key_file, password
):
- ssl_config = httpx.SSLConfig(
+ ssl_config = SSLConfig(
cert=(cert_pem_file, cert_encrypted_private_key_file, password)
)
context = ssl_config.load_ssl_context()
def test_load_ssl_config_cert_and_key_invalid_password(
cert_pem_file, cert_encrypted_private_key_file
):
- ssl_config = httpx.SSLConfig(
+ ssl_config = SSLConfig(
cert=(cert_pem_file, cert_encrypted_private_key_file, "password1")
)
def test_load_ssl_config_cert_without_key_raises(cert_pem_file):
- ssl_config = httpx.SSLConfig(cert=cert_pem_file)
+ ssl_config = SSLConfig(cert=cert_pem_file)
with pytest.raises(ssl.SSLError):
ssl_config.load_ssl_context()
def test_load_ssl_config_no_verify():
- ssl_config = httpx.SSLConfig(verify=False)
+ ssl_config = SSLConfig(verify=False)
context = ssl_config.load_ssl_context()
assert context.verify_mode == ssl.VerifyMode.CERT_NONE
assert context.check_hostname is False
def test_load_ssl_context():
ssl_context = ssl.create_default_context()
- ssl_config = httpx.SSLConfig(verify=ssl_context)
+ ssl_config = SSLConfig(verify=ssl_context)
assert ssl_config.verify is True
assert ssl_config.ssl_context is ssl_context
def test_ssl_repr():
- ssl = httpx.SSLConfig(verify=False)
+ ssl = SSLConfig(verify=False)
assert repr(ssl) == "SSLConfig(cert=None, verify=False)"
+def test_ssl_eq():
+ ssl = SSLConfig(verify=False)
+ assert ssl == SSLConfig(verify=False)
+
+
def test_limits_repr():
limits = httpx.PoolLimits(hard_limit=100)
assert repr(limits) == "PoolLimits(soft_limit=None, hard_limit=100)"
-def test_ssl_eq():
- ssl = httpx.SSLConfig(verify=False)
- assert ssl == httpx.SSLConfig(verify=False)
-
-
def test_limits_eq():
limits = httpx.PoolLimits(hard_limit=100)
assert limits == httpx.PoolLimits(hard_limit=100)
with monkeypatch.context() as m:
m.delenv("SSLKEYLOGFILE", raising=False)
- ssl_config = httpx.SSLConfig(trust_env=True)
+ ssl_config = SSLConfig(trust_env=True)
ssl_config.load_ssl_context()
assert ssl_config.ssl_context.keylog_filename is None
with monkeypatch.context() as m:
m.setenv("SSLKEYLOGFILE", filename)
- ssl_config = httpx.SSLConfig(trust_env=True)
+ ssl_config = SSLConfig(trust_env=True)
ssl_config.load_ssl_context()
assert ssl_config.ssl_context.keylog_filename == filename
- ssl_config = httpx.SSLConfig(trust_env=False)
+ ssl_config = SSLConfig(trust_env=False)
ssl_config.load_ssl_context()
assert ssl_config.ssl_context.keylog_filename is None
headers = [(b"Content-Encoding", header_value)]
body = b"test 123"
compressed_body = brotli.compress(body)[3:]
- with pytest.raises(httpx.exceptions.DecodingError):
+ with pytest.raises(httpx.DecodingError):
response = httpx.Response(200, headers=headers, content=compressed_body)
response.content
import pytest
-from httpx import (
- CertTypes,
- Client,
- Dispatcher,
- Request,
- Response,
- TimeoutTypes,
- VerifyTypes,
- multipart,
-)
+import httpx
+from httpx.config import CertTypes, TimeoutTypes, VerifyTypes
+from httpx.dispatch.base import Dispatcher
+from httpx.multipart import _format_param, multipart_encode
class MockDispatch(Dispatcher):
async def send(
self,
- request: Request,
+ request: httpx.Request,
verify: VerifyTypes = None,
cert: CertTypes = None,
timeout: TimeoutTypes = None,
- ) -> Response:
+ ) -> httpx.Response:
content = await request.read()
- return Response(200, content=content)
+ return httpx.Response(200, content=content)
@pytest.mark.parametrize(("value,output"), (("abc", b"abc"), (b"abc", b"abc")))
@pytest.mark.asyncio
async def test_multipart(value, output):
- client = Client(dispatch=MockDispatch())
+ client = httpx.Client(dispatch=MockDispatch())
# Test with a single-value 'data' argument, and a plain file 'files' argument.
data = {"text": value}
@pytest.mark.parametrize(("key"), (b"abc", 1, 2.3, None))
@pytest.mark.asyncio
async def test_multipart_invalid_key(key):
- client = Client(dispatch=MockDispatch())
+ client = httpx.Client(dispatch=MockDispatch())
data = {key: "abc"}
files = {"file": io.BytesIO(b"<file content>")}
with pytest.raises(TypeError) as e:
@pytest.mark.parametrize(("value"), (1, 2.3, None, [None, "abc"], {None: "abc"}))
@pytest.mark.asyncio
async def test_multipart_invalid_value(value):
- client = Client(dispatch=MockDispatch())
+ client = httpx.Client(dispatch=MockDispatch())
data = {"text": value}
files = {"file": io.BytesIO(b"<file content>")}
with pytest.raises(TypeError) as e:
@pytest.mark.asyncio
async def test_multipart_file_tuple():
- client = Client(dispatch=MockDispatch())
+ client = httpx.Client(dispatch=MockDispatch())
# Test with a list of values 'data' argument, and a tuple style 'files' argument.
data = {"text": ["abc"]}
with mock.patch("os.urandom", return_value=os.urandom(16)):
boundary = binascii.hexlify(os.urandom(16)).decode("ascii")
- body, content_type = multipart.multipart_encode(data=data, files=files)
+ body, content_type = multipart_encode(data=data, files=files)
assert content_type == f"multipart/form-data; boundary={boundary}"
assert body == (
'--{0}\r\nContent-Disposition: form-data; name="a"\r\n\r\n1\r\n'
with mock.patch("os.urandom", return_value=os.urandom(16)):
boundary = binascii.hexlify(os.urandom(16)).decode("ascii")
- body, content_type = multipart.multipart_encode(data={}, files=files)
+ body, content_type = multipart_encode(data={}, files=files)
assert content_type == f"multipart/form-data; boundary={boundary}"
assert body == (
with mock.patch("os.urandom", return_value=os.urandom(16)):
boundary = binascii.hexlify(os.urandom(16)).decode("ascii")
- body, content_type = multipart.multipart_encode(data={}, files=files)
+ body, content_type = multipart_encode(data={}, files=files)
assert content_type == f"multipart/form-data; boundary={boundary}"
assert body == (
with mock.patch("os.urandom", return_value=os.urandom(16)):
boundary = binascii.hexlify(os.urandom(16)).decode("ascii")
- body, content_type = multipart.multipart_encode(data={}, files=files)
+ body, content_type = multipart_encode(data={}, files=files)
assert content_type == f"multipart/form-data; boundary={boundary}"
assert body == (
class TestHeaderParamHTML5Formatting:
def test_unicode(self):
- param = multipart._format_param("filename", "n\u00e4me")
+ param = _format_param("filename", "n\u00e4me")
assert param == b'filename="n\xc3\xa4me"'
def test_ascii(self):
- param = multipart._format_param("filename", b"name")
+ param = _format_param("filename", b"name")
assert param == b'filename="name"'
def test_unicode_escape(self):
- param = multipart._format_param("filename", "hello\\world\u0022")
+ param = _format_param("filename", "hello\\world\u0022")
assert param == b'filename="hello\\\\world%22"'
def test_unicode_with_control_character(self):
- param = multipart._format_param("filename", "hello\x1A\x1B\x1C")
+ param = _format_param("filename", "hello\x1A\x1B\x1C")
assert param == b'filename="hello%1A\x1B%1C"'
import pytest
-from httpx import (
- Client,
- ConnectTimeout,
- PoolLimits,
- PoolTimeout,
- ReadTimeout,
- Timeout,
- WriteTimeout,
-)
+import httpx
async def test_read_timeout(server, backend):
- timeout = Timeout(read_timeout=1e-6)
+ timeout = httpx.Timeout(read_timeout=1e-6)
- async with Client(timeout=timeout, backend=backend) as client:
- with pytest.raises(ReadTimeout):
+ async with httpx.Client(timeout=timeout, backend=backend) as client:
+ with pytest.raises(httpx.ReadTimeout):
await client.get(server.url.copy_with(path="/slow_response"))
async def test_write_timeout(server, backend):
- timeout = Timeout(write_timeout=1e-6)
+ timeout = httpx.Timeout(write_timeout=1e-6)
- async with Client(timeout=timeout, backend=backend) as client:
- with pytest.raises(WriteTimeout):
+ async with httpx.Client(timeout=timeout, backend=backend) as client:
+ with pytest.raises(httpx.WriteTimeout):
data = b"*" * 1024 * 1024 * 100
await client.put(server.url.copy_with(path="/slow_response"), data=data)
async def test_connect_timeout(server, backend):
- timeout = Timeout(connect_timeout=1e-6)
+ timeout = httpx.Timeout(connect_timeout=1e-6)
- async with Client(timeout=timeout, backend=backend) as client:
- with pytest.raises(ConnectTimeout):
+ async with httpx.Client(timeout=timeout, backend=backend) as client:
+ with pytest.raises(httpx.ConnectTimeout):
# See https://stackoverflow.com/questions/100841/
await client.get("http://10.255.255.1/")
async def test_pool_timeout(server, backend):
- pool_limits = PoolLimits(hard_limit=1)
- timeout = Timeout(pool_timeout=1e-4)
+ pool_limits = httpx.PoolLimits(hard_limit=1)
+ timeout = httpx.Timeout(pool_timeout=1e-4)
- async with Client(
+ async with httpx.Client(
pool_limits=pool_limits, timeout=timeout, backend=backend
) as client:
async with client.stream("GET", server.url):
- with pytest.raises(PoolTimeout):
+ with pytest.raises(httpx.PoolTimeout):
await client.get("http://localhost:8000/")