# (filename, file (or text), content_type)
Tuple[Optional[str], FileContent, Optional[str]],
]
-RequestFiles = Union[Mapping[str, FileTypes], List[Tuple[str, FileTypes]]]
+RequestFiles = Union[Mapping[str, FileTypes], Sequence[Tuple[str, FileTypes]]]
${PREFIX}black --check --diff --target-version=py36 $SOURCE_FILES
${PREFIX}flake8 $SOURCE_FILES
-${PREFIX}mypy httpx
+${PREFIX}mypy $SOURCE_FILES
${PREFIX}isort --check --diff --project=httpx $SOURCE_FILES
def test_asgi_dispatch_deprecated():
+ async def app(scope, receive, send):
+ pass
+
with pytest.warns(DeprecationWarning) as record:
- ASGIDispatch(None)
+ ASGIDispatch(app)
assert len(record) == 1
assert (
Auth,
Client,
DigestAuth,
- Headers,
ProtocolError,
Request,
RequestBodyUnavailable,
async def request(
self,
method: bytes,
- url: typing.Tuple[bytes, bytes, int, bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: ContentStream,
+ url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
+ headers: typing.List[typing.Tuple[bytes, bytes]] = None,
+ stream: httpcore.AsyncByteStream = None,
timeout: typing.Dict[str, typing.Optional[float]] = None,
) -> typing.Tuple[
bytes, int, bytes, typing.List[typing.Tuple[bytes, bytes]], ContentStream
]:
if self._response_count < self.send_response_after_attempt:
- return self.challenge_send(method, url, headers, stream)
+ assert headers is not None
+ return self.challenge_send(method, headers)
authorization = get_header_value(headers, "Authorization")
body = JSONStream({"auth": authorization})
return b"HTTP/1.1", 200, b"", [], body
def challenge_send(
- self, method: bytes, url: URL, headers: Headers, stream: ContentStream,
- ) -> typing.Tuple[int, bytes, Headers, ContentStream]:
+ self, method: bytes, headers: typing.List[typing.Tuple[bytes, bytes]],
+ ) -> typing.Tuple[
+ bytes, int, bytes, typing.List[typing.Tuple[bytes, bytes]], ContentStream
+ ]:
self._response_count += 1
nonce = (
hashlib.sha256(os.urandom(8)).hexdigest()
async def test_auth_invalid_type() -> None:
url = "https://example.org/"
client = AsyncClient(
- transport=AsyncMockTransport(), auth="not a tuple, not a callable",
+ transport=AsyncMockTransport(),
+ auth="not a tuple, not a callable", # type: ignore
)
with pytest.raises(TypeError):
await client.get(url)
def test_wsgi_dispatch_deprecated():
+ def app(start_response, environ):
+ pass
+
with pytest.warns(DeprecationWarning) as record:
- WSGIDispatch(None)
+ WSGIDispatch(app)
assert len(record) == 1
assert (
async def request(
self,
method: bytes,
- url: typing.Tuple[bytes, bytes, int, bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: ContentStream,
+ url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
+ headers: typing.List[typing.Tuple[bytes, bytes]] = None,
+ stream: httpcore.AsyncByteStream = None,
timeout: typing.Dict[str, typing.Optional[float]] = None,
) -> typing.Tuple[
bytes, int, bytes, typing.List[typing.Tuple[bytes, bytes]], ContentStream
]:
host, scheme, port, path = url
+ body: ContentStream
if path.startswith(b"/echo_cookies"):
cookie = get_header_value(headers, "cookie")
body = JSONStream({"cookies": cookie})
async def request(
self,
method: bytes,
- url: typing.Tuple[bytes, bytes, int, bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: ContentStream,
+ url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
+ headers: typing.List[typing.Tuple[bytes, bytes]] = None,
+ stream: httpcore.AsyncByteStream = None,
timeout: typing.Dict[str, typing.Optional[float]] = None,
) -> typing.Tuple[
bytes, int, bytes, typing.List[typing.Tuple[bytes, bytes]], ContentStream
]:
+ assert headers is not None
headers_dict = {
key.decode("ascii"): value.decode("ascii") for key, value in headers
}
def test_client_headers():
client = AsyncClient()
- client.headers = {"a": "b"}
+ client.headers = {"a": "b"} # type: ignore
assert isinstance(client.headers, Headers)
assert client.headers["A"] == "b"
def test_client_cookies():
client = AsyncClient()
- client.cookies = {"a": "b"}
+ client.cookies = {"a": "b"} # type: ignore
assert isinstance(client.cookies, Cookies)
mycookies = list(client.cookies.jar)
assert len(mycookies) == 1
+import httpcore
import pytest
import httpx
for proxy_key, url in expected_proxies:
assert proxy_key in client.proxies
- assert client.proxies[proxy_key].proxy_origin == httpx.URL(url).raw[:3]
+ proxy = client.proxies[proxy_key]
+ assert isinstance(proxy, httpcore.AsyncHTTPProxy)
+ assert proxy.proxy_origin == httpx.URL(url).raw[:3]
assert len(expected_proxies) == len(client.proxies)
if expected is None:
assert transport is client.transport
else:
+ assert isinstance(transport, httpcore.AsyncHTTPProxy)
assert transport.proxy_origin == httpx.URL(expected).raw[:3]
import httpcore
import pytest
-from httpx import URL, AsyncClient, Headers, QueryParams
+from httpx import URL, AsyncClient, QueryParams
from httpx._content_streams import ContentStream, JSONStream
async def request(
self,
method: bytes,
- url: typing.Tuple[bytes, bytes, int, bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: ContentStream,
+ url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
+ headers: typing.List[typing.Tuple[bytes, bytes]] = None,
+ stream: httpcore.AsyncByteStream = None,
timeout: typing.Dict[str, typing.Optional[float]] = None,
) -> typing.Tuple[
bytes, int, bytes, typing.List[typing.Tuple[bytes, bytes]], ContentStream
]:
- headers = Headers()
body = JSONStream({"ok": "ok"})
- return b"HTTP/1.1", 200, b"OK", headers, body
+ return b"HTTP/1.1", 200, b"OK", [], body
def test_client_queryparams():
assert client.params["a"] == "b"
client = AsyncClient()
- client.params = "a=b"
+ client.params = "a=b" # type: ignore
assert isinstance(client.params, QueryParams)
assert client.params["a"] == "b"
headers_dict = {
key.decode("ascii"): value.decode("ascii") for key, value in headers
}
- content = ByteStream(json.dumps({"headers": headers_dict}).encode())
- return b"HTTP/1.1", 200, b"OK", [], content
+ stream = ByteStream(json.dumps({"headers": headers_dict}).encode())
+ return b"HTTP/1.1", 200, b"OK", [], stream
elif path == b"/redirect_body":
code = codes.PERMANENT_REDIRECT
headers_dict = {
key.decode("ascii"): value.decode("ascii") for key, value in headers
}
- body = ByteStream(
+ stream = ByteStream(
json.dumps({"body": content.decode(), "headers": headers_dict}).encode()
)
- return b"HTTP/1.1", 200, b"OK", [], body
+ return b"HTTP/1.1", 200, b"OK", [], stream
elif path == b"/cross_subdomain":
host = get_header_value(headers, "host")
async def request(
self,
method: bytes,
- url: typing.Tuple[bytes, bytes, int, bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: ContentStream,
+ url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
+ headers: typing.List[typing.Tuple[bytes, bytes]] = None,
+ stream: httpcore.AsyncByteStream = None,
timeout: typing.Dict[str, typing.Optional[float]] = None,
) -> typing.Tuple[
bytes, int, bytes, typing.List[typing.Tuple[bytes, bytes]], ContentStream
]
return b"HTTP/1.1", status_code, b"See Other", headers, ByteStream(b"")
- elif path == b"/logout":
+ else:
+ assert path == b"/logout"
status_code = codes.SEE_OTHER
headers = [
(b"location", b"/"),
@pytest.fixture(scope="function", autouse=True)
-def clean_environ() -> typing.Dict[str, typing.Any]:
+def clean_environ():
"""Keeps os.environ clean for every test without having to mock os.environ"""
original_environ = os.environ.copy()
os.environ.clear()
assert response.http_version == "HTTP/1.1"
-@pytest.mark.asyncio
-async def test_get_invalid_url(server):
+def test_get_invalid_url():
with pytest.raises(httpx.InvalidURL):
- await httpx.get("invalid://example.org")
+ httpx.get("invalid://example.org")
def test_load_ssl_config_verify_directory():
path = Path(certifi.where()).parent
- ssl_config = SSLConfig(verify=path)
+ ssl_config = SSLConfig(verify=str(path))
context = ssl_config.ssl_context
assert context.verify_mode == ssl.VerifyMode.CERT_REQUIRED
assert context.check_hostname is True
ssl_config = SSLConfig(trust_env=True)
- assert ssl_config.ssl_context.keylog_filename is None
+ assert ssl_config.ssl_context.keylog_filename is None # type: ignore
filename = str(tmpdir.join("test.log"))
ssl_config = SSLConfig(trust_env=True)
- assert ssl_config.ssl_context.keylog_filename == filename
+ assert ssl_config.ssl_context.keylog_filename == filename # type: ignore
ssl_config = SSLConfig(trust_env=False)
- assert ssl_config.ssl_context.keylog_filename is None
+ assert ssl_config.ssl_context.keylog_filename is None # type: ignore
@pytest.mark.parametrize(
def test_invalid_argument():
with pytest.raises(TypeError):
- encode(123)
+ encode(123) # type: ignore
@pytest.mark.asyncio
import pytest
import httpx
-from httpx._content_streams import AsyncIteratorStream, encode
+from httpx._content_streams import AsyncIteratorStream, MultipartStream, encode
from httpx._utils import format_form_param
async def request(
self,
method: bytes,
- url: typing.Tuple[bytes, bytes, int, bytes],
+ url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
headers: typing.List[typing.Tuple[bytes, bytes]] = None,
stream: httpcore.AsyncByteStream = None,
timeout: typing.Dict[str, typing.Optional[float]] = None,
typing.List[typing.Tuple[bytes, bytes]],
httpcore.AsyncByteStream,
]:
+ assert stream is not None
content = AsyncIteratorStream(aiterator=(part async for part in stream))
return b"HTTP/1.1", 200, b"OK", [], content
# bit grungy, but sufficient just for our testing purposes.
boundary = response.request.headers["Content-Type"].split("boundary=")[-1]
content_length = response.request.headers["Content-Length"]
- pdict = {"boundary": boundary.encode("ascii"), "CONTENT-LENGTH": content_length}
+ pdict: dict = {
+ "boundary": boundary.encode("ascii"),
+ "CONTENT-LENGTH": content_length,
+ }
multipart = cgi.parse_multipart(io.BytesIO(response.content), pdict)
# Note that the expected return type for text fields
# bit grungy, but sufficient just for our testing purposes.
boundary = response.request.headers["Content-Type"].split("boundary=")[-1]
content_length = response.request.headers["Content-Length"]
- pdict = {"boundary": boundary.encode("ascii"), "CONTENT-LENGTH": content_length}
+ pdict: dict = {
+ "boundary": boundary.encode("ascii"),
+ "CONTENT-LENGTH": content_length,
+ }
multipart = cgi.parse_multipart(io.BytesIO(response.content), pdict)
# Note that the expected return type for text fields
boundary = os.urandom(16).hex()
stream = encode(data=data, files=files)
+ assert isinstance(stream, MultipartStream)
assert stream.can_replay()
assert stream.content_type == f"multipart/form-data; boundary={boundary}"
boundary = os.urandom(16).hex()
stream = encode(data={}, files=files)
+ assert isinstance(stream, MultipartStream)
assert stream.can_replay()
assert stream.content_type == f"multipart/form-data; boundary={boundary}"
boundary = os.urandom(16).hex()
stream = encode(data={}, files=files)
+ assert isinstance(stream, MultipartStream)
assert stream.can_replay()
assert stream.content_type == f"multipart/form-data; boundary={boundary}"
boundary = os.urandom(16).hex()
stream = encode(data={}, files=files)
+ assert isinstance(stream, MultipartStream)
assert stream.can_replay()
assert stream.content_type == f"multipart/form-data; boundary={boundary}"
yield b"Hello"
yield b"World"
- fileobj = IteratorIO(data())
+ fileobj: typing.Any = IteratorIO(data())
files = {"file": fileobj}
stream = encode(files=files, boundary=b"+++")
assert not stream.can_replay()
def test_lowercase_status_code():
- assert httpx.codes.not_found == 404
+ assert httpx.codes.not_found == 404 # type: ignore
def test_reason_phrase_for_status_code():