coroutine = self.send
args = [request]
- kwargs = dict(
- stream=True,
- auth=auth,
- allow_redirects=allow_redirects,
- verify=verify,
- cert=cert,
- timeout=timeout,
- )
+ kwargs = {
+ "stream": True,
+ "auth": auth,
+ "allow_redirects": allow_redirects,
+ "verify": verify,
+ "cert": cert,
+ "timeout": timeout,
+ }
async_response = concurrency_backend.run(coroutine, *args, **kwargs)
content = getattr(
set_value = value.encode(self.encoding)
found_indexes = []
- for idx, (item_key, item_value) in enumerate(self._list):
+ for idx, (item_key, _) in enumerate(self._list):
if item_key == set_key:
found_indexes.append(idx)
del_key = key.lower().encode(self.encoding)
pop_indexes = []
- for idx, (item_key, item_value) in enumerate(self._list):
+ for idx, (item_key, _) in enumerate(self._list):
if item_key == del_key:
pop_indexes.append(idx)
def __contains__(self, key: typing.Any) -> bool:
get_header_key = key.lower().encode(self.encoding)
- for header_key, header_value in self._list:
+ for header_key, _ in self._list:
if header_key == get_header_key:
return True
return False
def content(self) -> bytes:
if not hasattr(self, "_content"):
if hasattr(self, "_raw_content"):
- raw_content = getattr(self, "_raw_content") # type: bytes
+ raw_content = self._raw_content # type: ignore
content = self.decoder.decode(raw_content)
content += self.decoder.flush()
self._content = content
"""
Set a cookie value by name. May optionally include domain and path.
"""
- kwargs = dict(
- version=0,
- name=name,
- value=value,
- port=None,
- port_specified=False,
- domain=domain,
- domain_specified=bool(domain),
- domain_initial_dot=domain.startswith("."),
- path=path,
- path_specified=bool(path),
- secure=False,
- expires=None,
- discard=True,
- comment=None,
- comment_url=None,
- rest={"HttpOnly": None},
- rfc2109=False,
- )
+ kwargs = {
+ "version": 0,
+ "name": name,
+ "value": value,
+ "port": None,
+ "port_specified": False,
+ "domain": domain,
+ "domain_specified": bool(domain),
+ "domain_initial_dot": domain.startswith("."),
+ "path": path,
+ "path_specified": bool(path),
+ "secure": False,
+ "expires": None,
+ "discard": True,
+ "comment": None,
+ "comment_url": None,
+ "rest": {"HttpOnly": None},
+ "rfc2109": False,
+ }
cookie = Cookie(**kwargs) # type: ignore
self.jar.set_cookie(cookie)
return (cookie.name for cookie in self.jar)
def __bool__(self) -> bool:
- for cookie in self.jar:
+ for _ in self.jar:
return True
return False
black
cryptography
flake8
+flake8-bugbear
+flake8-comprehensions
isort
mypy
pytest
${PREFIX}autoflake --in-place --recursive httpx tests setup.py
${PREFIX}isort --multi-line=3 --trailing-comma --force-grid-wrap=0 --combine-as --line-width 88 --recursive --apply httpx tests setup.py
${PREFIX}black httpx tests setup.py
-${PREFIX}flake8 --max-line-length=88 --ignore=W503,E203 httpx tests setup.py
+${PREFIX}flake8 --max-line-length=88 --ignore=W503,E203,B305 httpx tests setup.py
${PREFIX}mypy httpx --ignore-missing-imports --disallow-untyped-defs
scripts/clean
@pytest.mark.asyncio
async def test_keepalive_connection_closed_by_server_is_reestablished(server):
"""
- Upon keep-alive connection closed by remote a new connection should be reestablished.
+ Upon keep-alive connection closed by remote a new connection
+ should be reestablished.
"""
async with httpx.ConnectionPool() as http:
response = await http.request("GET", "http://127.0.0.1:8000/")
await response.read()
- await server.shutdown() # shutdown the server to close the keep-alive connection
+ # shutdown the server to close the keep-alive connection
+ await server.shutdown()
await server.startup()
response = await http.request("GET", "http://127.0.0.1:8000/")
@pytest.mark.asyncio
async def test_keepalive_http2_connection_closed_by_server_is_reestablished(server):
"""
- Upon keep-alive connection closed by remote a new connection should be reestablished.
+ Upon keep-alive connection closed by remote a new connection
+ should be reestablished.
"""
async with httpx.ConnectionPool() as http:
response = await http.request("GET", "http://127.0.0.1:8000/")
await response.read()
- await server.shutdown() # shutdown the server to close the keep-alive connection
+ # shutdown the server to close the keep-alive connection
+ await server.shutdown()
await server.startup()
response = await http.request("GET", "http://127.0.0.1:8000/")
def test_json_with_specified_encoding():
- data = dict(greeting="hello", recipient="world")
+ data = {"greeting": "hello", "recipient": "world"}
content = json.dumps(data).encode("utf-16")
headers = {"Content-Type": "application/json, charset=utf-16"}
response = httpx.Response(200, content=content, headers=headers)
def test_json_with_options():
- data = dict(greeting="hello", recipient="world", amount=1)
+ data = {"greeting": "hello", "recipient": "world", "amount": 1}
content = json.dumps(data).encode("utf-16")
headers = {"Content-Type": "application/json, charset=utf-16"}
response = httpx.Response(200, content=content, headers=headers)
def test_json_without_specified_encoding():
- data = dict(greeting="hello", recipient="world")
+ data = {"greeting": "hello", "recipient": "world"}
content = json.dumps(data).encode("utf-32-be")
headers = {"Content-Type": "application/json"}
response = httpx.Response(200, content=content, headers=headers)
def test_json_without_specified_encoding_decode_error():
- data = dict(greeting="hello", recipient="world")
+ data = {"greeting": "hello", "recipient": "world"}
content = json.dumps(data).encode("utf-32-be")
headers = {"Content-Type": "application/json"}
# force incorrect guess from `guess_json_utf` to trigger error
+import pytest
+
from httpx import URL
from httpx.exceptions import InvalidURL
-import pytest
def test_idna_url():
Some basic URL joining tests.
"""
url = URL("https://example.org:123/path/to/somewhere")
- assert url.join('/somewhere-else') == "https://example.org:123/somewhere-else"
- assert url.join('somewhere-else') == "https://example.org:123/path/to/somewhere-else"
- assert url.join('../somewhere-else') == "https://example.org:123/path/somewhere-else"
- assert url.join('../../somewhere-else') == "https://example.org:123/somewhere-else"
+ assert url.join("/somewhere-else") == "https://example.org:123/somewhere-else"
+ assert (
+ url.join("somewhere-else") == "https://example.org:123/path/to/somewhere-else"
+ )
+ assert (
+ url.join("../somewhere-else") == "https://example.org:123/path/somewhere-else"
+ )
+ assert url.join("../../somewhere-else") == "https://example.org:123/somewhere-else"
def test_url_join_rfc3986():
'--{0}\r\nContent-Disposition: form-data; name="d"\r\n\r\nff\r\n'
'--{0}\r\nContent-Disposition: form-data; name="d"\r\n\r\nfff\r\n'
'--{0}\r\nContent-Disposition: form-data; name="f"\r\n\r\n\r\n'
- '--{0}\r\nContent-Disposition: form-data; name="file"; filename="name.txt"\r\n'
+ '--{0}\r\nContent-Disposition: form-data; name="file";'
+ ' filename="name.txt"\r\n'
"Content-Type: text/plain\r\n\r\n<file content>\r\n"
"--{0}--\r\n"
"".format(boundary).encode("ascii")