max-line-length = 120
[mypy]
-disallow_untyped_defs = True
-disallow_any_generics = True
ignore_missing_imports = True
-no_implicit_optional = True
-show_error_codes = True
-warn_unused_ignores = True
-warn_unused_configs = True
-disallow_subclassing_any = True
-check_untyped_defs = True
-disallow_untyped_decorators = True
-warn_redundant_casts = True
-strict_concatenate = True
-disallow_incomplete_defs = True
-no_implicit_reexport = True
-warn_return_any = True
-strict_equality = True
+strict = True
[mypy-tests.*]
disallow_untyped_defs = False
@pytest.mark.usefixtures("async_environment")
async def test_stream_request(server):
- async def hello_world():
+ async def hello_world() -> typing.AsyncIterator[bytes]:
yield b"Hello, "
yield b"world!"
@pytest.mark.usefixtures("async_environment")
async def test_cannot_stream_sync_request(server):
- def hello_world(): # pragma: no cover
+ def hello_world() -> typing.Iterator[bytes]: # pragma: no cover
yield b"Hello, "
yield b"world!"
@pytest.mark.usefixtures("async_environment")
async def test_context_managed_transport():
class Transport(httpx.AsyncBaseTransport):
- def __init__(self):
- self.events = []
+ def __init__(self) -> None:
+ self.events: typing.List[str] = []
async def aclose(self):
# The base implementation of httpx.AsyncBaseTransport just
requires_response_body = True
- def __init__(self, token):
+ def __init__(self, token: str) -> None:
self.token = token
def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
auth = DigestAuth(username="user", password="password123")
app = DigestApp()
- async def streaming_body():
+ async def streaming_body() -> typing.AsyncIterator[bytes]:
yield b"Example request body" # pragma: no cover
async with httpx.AsyncClient(transport=ConsumeBodyTransport(app)) as client:
def test_cannot_stream_async_request(server):
- async def hello_world(): # pragma: no cover
+ async def hello_world() -> typing.AsyncIterator[bytes]: # pragma: no cover
yield b"Hello, "
yield b"world!"
def test_context_managed_transport():
class Transport(httpx.BaseTransport):
- def __init__(self):
- self.events = []
+ def __init__(self) -> None:
+ self.events: typing.List[str] = []
def close(self):
# The base implementation of httpx.BaseTransport just
+import typing
+
import pytest
import httpx
client = httpx.Client(transport=ConsumeBodyTransport(redirects))
url = "https://example.org/redirect_body"
- def streaming_body():
+ def streaming_body() -> typing.Iterator[bytes]:
yield b"Example request body" # pragma: no cover
with pytest.raises(httpx.StreamConsumed):
from httpx import URL
from tests.concurrency import sleep
+if typing.TYPE_CHECKING: # pragma: no cover
+ from httpx._transports.asgi import _Receive, _Send
+
ENVIRONMENT_VARIABLES = {
"SSL_CERT_FILE",
"SSL_CERT_DIR",
os.environ.update(original_environ)
-async def app(scope, receive, send):
+_Scope = typing.Dict[str, typing.Any]
+
+
+async def app(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
assert scope["type"] == "http"
if scope["path"].startswith("/slow_response"):
await slow_response(scope, receive, send)
await hello_world(scope, receive, send)
-async def hello_world(scope, receive, send):
+async def hello_world(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
await send(
{
"type": "http.response.start",
await send({"type": "http.response.body", "body": b"Hello, world!"})
-async def hello_world_json(scope, receive, send):
+async def hello_world_json(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
await send(
{
"type": "http.response.start",
await send({"type": "http.response.body", "body": b'{"Hello": "world!"}'})
-async def slow_response(scope, receive, send):
+async def slow_response(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
await sleep(1.0)
await send(
{
await send({"type": "http.response.body", "body": b"Hello, world!"})
-async def status_code(scope, receive, send):
+async def status_code(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
status_code = int(scope["path"].replace("/status/", ""))
await send(
{
await send({"type": "http.response.body", "body": b"Hello, world!"})
-async def echo_body(scope, receive, send):
+async def echo_body(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
body = b""
more_body = True
await send({"type": "http.response.body", "body": body})
-async def echo_binary(scope, receive, send):
+async def echo_binary(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
body = b""
more_body = True
await send({"type": "http.response.body", "body": body})
-async def echo_headers(scope, receive, send):
+async def echo_headers(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
body = {
name.capitalize().decode(): value.decode()
for name, value in scope.get("headers", [])
await send({"type": "http.response.body", "body": json.dumps(body).encode()})
-async def redirect_301(scope, receive, send):
+async def redirect_301(scope: _Scope, receive: "_Receive", send: "_Send") -> None:
await send(
{"type": "http.response.start", "status": 301, "headers": [[b"location", b"/"]]}
)
while not self.started:
await sleep(0.2)
- async def watch_restarts(self): # pragma: no cover
+ async def watch_restarts(self) -> None: # pragma: no cover
while True:
if self.should_exit:
return
def test_generator_with_transfer_encoding_header():
- def content():
+ def content() -> typing.Iterator[bytes]:
yield b"test 123" # pragma: no cover
request = httpx.Request("POST", "http://example.org", content=content())
def test_generator_with_content_length_header():
- def content():
+ def content() -> typing.Iterator[bytes]:
yield b"test 123" # pragma: no cover
headers = {"Content-Length": "8"}
def test_cannot_access_streaming_content_without_read():
# Ensure that streaming requests
- def streaming_body(): # pragma: no cover
- yield ""
+ def streaming_body() -> typing.Iterator[bytes]: # pragma: no cover
+ yield b""
request = httpx.Request("POST", "http://example.org", content=streaming_body())
with pytest.raises(httpx.RequestNotRead):
def test_transfer_encoding_header():
- async def streaming_body(data):
+ async def streaming_body(data: bytes) -> typing.AsyncIterator[bytes]:
yield data # pragma: no cover
data = streaming_body(b"test 123")
See https://github.com/encode/httpx/issues/1168
"""
- def streaming_body(data):
+ def streaming_body(data: bytes) -> typing.Iterator[bytes]:
yield data # pragma: no cover
data = streaming_body(b"abcd")
def test_override_content_length_header():
- async def streaming_body(data):
+ async def streaming_body(data: bytes) -> typing.AsyncIterator[bytes]:
yield data # pragma: no cover
data = streaming_body(b"test 123")
@pytest.mark.asyncio
async def test_request_async_streaming_content_picklable():
- async def streaming_body(data):
+ async def streaming_body(data: bytes) -> typing.AsyncIterator[bytes]:
yield data
data = streaming_body(b"test 123")
def test_request_generator_content_picklable():
- def content():
+ def content() -> typing.Iterator[bytes]:
yield b"test 123" # pragma: no cover
request = httpx.Request("POST", "http://example.org", content=content())
import json
import pickle
+import typing
import chardet
import pytest
yield b"world!"
-def streaming_body():
+def streaming_body() -> typing.Iterator[bytes]:
yield b"Hello, "
yield b"world!"
-async def async_streaming_body():
+async def async_streaming_body() -> typing.AsyncIterator[bytes]:
yield b"Hello, "
yield b"world!"
def test_iter_raw_doesnt_return_empty_chunks():
- def streaming_body_with_empty_chunks():
+ def streaming_body_with_empty_chunks() -> typing.Iterator[bytes]:
yield b"Hello, "
yield b""
yield b"world!"
def test_iter_bytes_doesnt_return_empty_chunks():
- def streaming_body_with_empty_chunks():
+ def streaming_body_with_empty_chunks() -> typing.Iterator[bytes]:
yield b"Hello, "
yield b""
yield b"world!"
def test_generator_with_transfer_encoding_header():
- def content():
+ def content() -> typing.Iterator[bytes]:
yield b"test 123" # pragma: no cover
response = httpx.Response(200, content=content())
def test_generator_with_content_length_header():
- def content():
+ def content() -> typing.Iterator[bytes]:
yield b"test 123" # pragma: no cover
headers = {"Content-Length": "8"}
+import typing
+
import pytest
import httpx
def test_post_byte_iterator(server):
- def data():
+ def data() -> typing.Iterator[bytes]:
yield b"Hello"
yield b", "
yield b"world!"
@pytest.mark.asyncio
async def test_async_bytesio_content():
class AsyncBytesIO:
- def __init__(self, content: bytes):
+ def __init__(self, content: bytes) -> None:
self._idx = 0
self._content = content
@pytest.mark.asyncio
async def test_iterator_content():
- def hello_world():
+ def hello_world() -> typing.Iterator[bytes]:
yield b"Hello, "
yield b"world!"
# Support 'data' for compat with requests.
with pytest.warns(DeprecationWarning):
- headers, stream = encode_request(data=hello_world())
+ headers, stream = encode_request(data=hello_world()) # type: ignore
assert isinstance(stream, typing.Iterable)
assert not isinstance(stream, typing.AsyncIterable)
@pytest.mark.asyncio
async def test_aiterator_content():
- async def hello_world():
+ async def hello_world() -> typing.AsyncIterator[bytes]:
yield b"Hello, "
yield b"world!"
# Support 'data' for compat with requests.
with pytest.warns(DeprecationWarning):
- headers, stream = encode_request(data=hello_world())
+ headers, stream = encode_request(data=hello_world()) # type: ignore
assert not isinstance(stream, typing.Iterable)
assert isinstance(stream, typing.AsyncIterable)
@pytest.mark.asyncio
async def test_response_iterator_content():
- def hello_world():
+ def hello_world() -> typing.Iterator[bytes]:
yield b"Hello, "
yield b"world!"
@pytest.mark.asyncio
async def test_response_aiterator_content():
- async def hello_world():
+ async def hello_world() -> typing.AsyncIterator[bytes]:
yield b"Hello, "
yield b"world!"
+import typing
import zlib
import chardet
body = b"test 123"
compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
- async def compress(body):
+ async def compress(body: bytes) -> typing.AsyncIterator[bytes]:
yield compressor.compress(body)
yield compressor.flush()
)
@pytest.mark.asyncio
async def test_text_decoder_with_autodetect(data, encoding):
- async def iterator():
+ async def iterator() -> typing.AsyncIterator[bytes]:
nonlocal data
for chunk in data:
yield chunk
@pytest.mark.asyncio
async def test_text_decoder_known_encoding():
- async def iterator():
+ async def iterator() -> typing.AsyncIterator[bytes]:
yield b"\x83g"
yield b"\x83"
yield b"\x89\x83x\x83\x8b"
import os
+import typing
from click.testing import CliRunner
import httpx
+from httpx import main
+if typing.TYPE_CHECKING: # pragma: no cover
+ # don't let mypy be misled by the fallback defined in httpx/__init__.py
+ from httpx._main import main # noqa: F811
-def splitlines(output):
+
+def splitlines(output: str) -> typing.Iterable[str]:
return [line.strip() for line in output.splitlines()]
-def remove_date_header(lines):
+def remove_date_header(lines: typing.Iterable[str]) -> typing.Iterable[str]:
return [line for line in lines if not line.startswith("date:")]
def test_help():
runner = CliRunner()
- result = runner.invoke(httpx.main, ["--help"])
+ result = runner.invoke(main, ["--help"])
assert result.exit_code == 0
assert "A next generation HTTP client." in result.output
def test_get(server):
url = str(server.url)
runner = CliRunner()
- result = runner.invoke(httpx.main, [url])
+ result = runner.invoke(main, [url])
assert result.exit_code == 0
assert remove_date_header(splitlines(result.output)) == [
"HTTP/1.1 200 OK",
def test_json(server):
url = str(server.url.copy_with(path="/json"))
runner = CliRunner()
- result = runner.invoke(httpx.main, [url])
+ result = runner.invoke(main, [url])
assert result.exit_code == 0
assert remove_date_header(splitlines(result.output)) == [
"HTTP/1.1 200 OK",
url = str(server.url.copy_with(path="/echo_binary"))
runner = CliRunner()
content = "Hello, world!"
- result = runner.invoke(httpx.main, [url, "-c", content])
+ result = runner.invoke(main, [url, "-c", content])
assert result.exit_code == 0
assert remove_date_header(splitlines(result.output)) == [
"HTTP/1.1 200 OK",
def test_redirects(server):
url = str(server.url.copy_with(path="/redirect_301"))
runner = CliRunner()
- result = runner.invoke(httpx.main, [url])
+ result = runner.invoke(main, [url])
assert result.exit_code == 1
assert remove_date_header(splitlines(result.output)) == [
"HTTP/1.1 301 Moved Permanently",
def test_follow_redirects(server):
url = str(server.url.copy_with(path="/redirect_301"))
runner = CliRunner()
- result = runner.invoke(httpx.main, [url, "--follow-redirects"])
+ result = runner.invoke(main, [url, "--follow-redirects"])
assert result.exit_code == 0
assert remove_date_header(splitlines(result.output)) == [
"HTTP/1.1 301 Moved Permanently",
def test_post(server):
url = str(server.url.copy_with(path="/echo_body"))
runner = CliRunner()
- result = runner.invoke(httpx.main, [url, "-m", "POST", "-j", '{"hello": "world"}'])
+ result = runner.invoke(main, [url, "-m", "POST", "-j", '{"hello": "world"}'])
assert result.exit_code == 0
assert remove_date_header(splitlines(result.output)) == [
"HTTP/1.1 200 OK",
def test_verbose(server):
url = str(server.url)
runner = CliRunner()
- result = runner.invoke(httpx.main, [url, "-v"])
+ result = runner.invoke(main, [url, "-v"])
assert result.exit_code == 0
assert remove_date_header(splitlines(result.output)) == [
"* Connecting to '127.0.0.1'",
def test_auth(server):
url = str(server.url)
runner = CliRunner()
- result = runner.invoke(httpx.main, [url, "-v", "--auth", "username", "password"])
+ result = runner.invoke(main, [url, "-v", "--auth", "username", "password"])
print(result.output)
assert result.exit_code == 0
assert remove_date_header(splitlines(result.output)) == [
url = str(server.url)
runner = CliRunner()
with runner.isolated_filesystem():
- runner.invoke(httpx.main, [url, "--download", "index.txt"])
+ runner.invoke(main, [url, "--download", "index.txt"])
assert os.path.exists("index.txt")
with open("index.txt", "r") as input_file:
assert input_file.read() == "Hello, world!"
def test_errors():
runner = CliRunner()
- result = runner.invoke(httpx.main, ["invalid://example.org"])
+ result = runner.invoke(main, ["invalid://example.org"])
assert result.exit_code == 1
assert splitlines(result.output) == [
"UnsupportedProtocol: Request URL has an unsupported protocol 'invalid://'.",
import sys
+import typing
import wsgiref.validate
from functools import partial
from io import StringIO
import httpx
+if typing.TYPE_CHECKING: # pragma: no cover
+ from _typeshed.wsgi import StartResponse, WSGIApplication, WSGIEnvironment
-def application_factory(output):
+
+def application_factory(output: typing.Iterable[bytes]) -> "WSGIApplication":
def application(environ, start_response):
status = "200 OK"
return wsgiref.validate.validator(application)
-def echo_body(environ, start_response):
+def echo_body(
+ environ: "WSGIEnvironment", start_response: "StartResponse"
+) -> typing.Iterable[bytes]:
status = "200 OK"
output = environ["wsgi.input"].read()
return [output]
-def echo_body_with_response_stream(environ, start_response):
+def echo_body_with_response_stream(
+ environ: "WSGIEnvironment", start_response: "StartResponse"
+) -> typing.Iterable[bytes]:
status = "200 OK"
response_headers = [("Content-Type", "text/plain")]
start_response(status, response_headers)
- def output_generator(f):
+ def output_generator(f: typing.IO[bytes]) -> typing.Iterator[bytes]:
while True:
output = f.read(2)
if not output:
return output_generator(f=environ["wsgi.input"])
-def raise_exc(environ, start_response, exc=ValueError):
+def raise_exc(
+ environ: "WSGIEnvironment",
+ start_response: "StartResponse",
+ exc: typing.Type[Exception] = ValueError,
+) -> typing.Iterable[bytes]:
status = "500 Server Error"
output = b"Nope!"
raise exc()
except exc:
exc_info = sys.exc_info()
- start_response(status, response_headers, exc_info=exc_info)
+ start_response(status, response_headers, exc_info)
return [output]