try:
from ._main import main
-except ImportError: # pragma: nocover
+except ImportError: # pragma: no cover
def main() -> None: # type: ignore
import sys
if http2:
try:
import h2 # noqa
- except ImportError: # pragma: nocover
+ except ImportError: # pragma: no cover
raise ImportError(
"Using http2=True, but the 'h2' package is not installed. "
"Make sure to install httpx using `pip install httpx[http2]`."
if http2:
try:
import h2 # noqa
- except ImportError: # pragma: nocover
+ except ImportError: # pragma: no cover
raise ImportError(
"Using http2=True, but the 'h2' package is not installed. "
"Make sure to install httpx using `pip install httpx[http2]`."
# The CFFI bindings in `brotlicffi` are recommended for PyPy and everything else.
try:
import brotlicffi as brotli
-except ImportError: # pragma: nocover
+except ImportError: # pragma: no cover
try:
import brotli
except ImportError:
class UnsetType:
- pass # pragma: nocover
+ pass # pragma: no cover
UNSET = UnsetType()
# AttributeError if only read-only access is implemented.
try:
context.post_handshake_auth = True # type: ignore
- except AttributeError: # pragma: nocover
+ except AttributeError: # pragma: no cover
pass
# Disable using 'commonName' for SSLContext.check_hostname
# when the 'subjectAltName' extension isn't available.
try:
context.hostname_checks_common_name = False # type: ignore
- except AttributeError: # pragma: nocover
+ except AttributeError: # pragma: no cover
pass
if ca_bundle_path.is_file():
alpn_idents = ["http/1.1", "h2"] if self.http2 else ["http/1.1"]
context.set_alpn_protocols(alpn_idents)
- if hasattr(context, "keylog_filename"): # pragma: nocover (Available in 3.8+)
+ if hasattr(context, "keylog_filename"): # pragma: no cover (Available in 3.8+)
keylogfile = os.environ.get("SSLKEYLOGFILE")
if keylogfile and self.trust_env:
context.keylog_filename = keylogfile # type: ignore
async def __aiter__(self) -> AsyncIterator[bytes]:
raise StreamClosed()
- yield b"" # pragma: nocover
+ yield b"" # pragma: no cover
def encode_content(
class ContentDecoder:
def decode(self, data: bytes) -> bytes:
- raise NotImplementedError() # pragma: nocover
+ raise NotImplementedError() # pragma: no cover
def flush(self) -> bytes:
- raise NotImplementedError() # pragma: nocover
+ raise NotImplementedError() # pragma: no cover
class IdentityDecoder(ContentDecoder):
def flush(self) -> bytes:
try:
return self.decompressor.flush()
- except zlib.error as exc: # pragma: nocover
+ except zlib.error as exc: # pragma: no cover
raise DecodingError(str(exc)) from exc
def flush(self) -> bytes:
try:
return self.decompressor.flush()
- except zlib.error as exc: # pragma: nocover
+ except zlib.error as exc: # pragma: no cover
raise DecodingError(str(exc)) from exc
"""
def __init__(self) -> None:
- if brotli is None: # pragma: nocover
+ if brotli is None: # pragma: no cover
raise ImportError(
"Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' "
"packages have been installed. "
self.seen_data = False
if hasattr(self.decompressor, "decompress"):
# The 'brotlicffi' package.
- self._decompress = self.decompressor.decompress # pragma: nocover
+ self._decompress = self.decompressor.decompress # pragma: no cover
else:
# The 'brotli' package.
- self._decompress = self.decompressor.process # pragma: nocover
+ self._decompress = self.decompressor.process # pragma: no cover
def decode(self, data: bytes) -> bytes:
if not data:
# As the decompressor decompresses eagerly, this
# will never actually emit any data. However, it will potentially throw
# errors if a truncated or damaged data stream has been used.
- self.decompressor.finish() # pragma: nocover
+ self.decompressor.finish() # pragma: no cover
return b""
- except brotli.error as exc: # pragma: nocover
+ except brotli.error as exc: # pragma: no cover
raise DecodingError(str(exc)) from exc
if brotli is None:
- SUPPORTED_DECODERS.pop("br") # pragma: nocover
+ SUPPORTED_DECODERS.pop("br") # pragma: no cover
import typing
if typing.TYPE_CHECKING:
- from ._models import Request, Response # pragma: nocover
+ from ._models import Request, Response # pragma: no cover
class HTTPError(Exception):
mime_type, _, _ = content_type.partition(";")
try:
return pygments.lexers.get_lexer_for_mimetype(mime_type.strip()).name
- except pygments.util.ClassNotFound: # pragma: nocover
+ except pygments.util.ClassNotFound: # pragma: no cover
pass
- return "" # pragma: nocover
+ return "" # pragma: no cover
def format_request_headers(request: httpcore.Request, http2: bool = False) -> str:
try:
data = response.json()
text = json.dumps(data, indent=4)
- except ValueError: # pragma: nocover
+ except ValueError: # pragma: no cover
text = response.text
else:
text = response.text
console.print(f"<{len(response.content)} bytes of binary data>")
-def format_certificate(cert: dict) -> str: # pragma: nocover
+def format_certificate(cert: dict) -> str: # pragma: no cover
lines = []
for key, value in cert.items():
if isinstance(value, (list, tuple)):
stream = info["return_value"]
server_addr = stream.get_extra_info("server_addr")
console.print(f"* Connected to {server_addr[0]!r} on port {server_addr[1]}")
- elif name == "connection.start_tls.complete" and verbose: # pragma: nocover
+ elif name == "connection.start_tls.complete" and verbose: # pragma: no cover
stream = info["return_value"]
ssl_object = stream.get_extra_info("ssl_object")
version = ssl_object.version()
elif name == "http11.send_request_headers.started" and verbose:
request = info["request"]
print_request_headers(request, http2=False)
- elif name == "http2.send_request_headers.started" and verbose: # pragma: nocover
+ elif name == "http2.send_request_headers.started" and verbose: # pragma: no cover
request = info["request"]
print_request_headers(request, http2=True)
elif name == "http11.receive_response_headers.complete":
http_version, status, reason_phrase, headers = info["return_value"]
print_response_headers(http_version, status, reason_phrase, headers)
- elif name == "http2.receive_response_headers.complete": # pragma: nocover
+ elif name == "http2.receive_response_headers.complete": # pragma: no cover
status, headers = info["return_value"]
http_version = b"HTTP/2"
reason_phrase = None
try:
return json.loads(value)
- except json.JSONDecodeError: # pragma: nocover
+ except json.JSONDecodeError: # pragma: no cover
raise click.BadParameter("Not valid JSON")
return None
username, password = value
- if password == "-": # pragma: nocover
+ if password == "-": # pragma: no cover
password = click.prompt("Password", hide_input=True)
return (username, password)
yield chunk
decoded = decoder.flush()
for chunk in chunker.decode(decoded):
- yield chunk # pragma: nocover
+ yield chunk # pragma: no cover
for chunk in chunker.flush():
yield chunk
yield chunk
decoded = decoder.flush()
for chunk in chunker.decode(decoded):
- yield chunk # pragma: nocover
+ yield chunk # pragma: no cover
for chunk in chunker.flush():
yield chunk
"""
raise NotImplementedError(
"The 'handle_request' method must be implemented."
- ) # pragma: nocover
+ ) # pragma: no cover
def close(self) -> None:
pass
) -> Response:
raise NotImplementedError(
"The 'handle_async_request' method must be implemented."
- ) # pragma: nocover
+ ) # pragma: no cover
async def aclose(self) -> None:
pass
if mapped_exc is None or issubclass(to_exc, mapped_exc):
mapped_exc = to_exc
- if mapped_exc is None: # pragma: nocover
+ if mapped_exc is None: # pragma: no cover
raise
message = str(exc)
elif proxy.url.scheme == "socks5":
try:
import socksio # noqa
- except ImportError: # pragma: nocover
+ except ImportError: # pragma: no cover
raise ImportError(
"Using SOCKS proxy, but the 'socksio' package is not installed. "
"Make sure to install httpx using `pip install httpx[socks]`."
http1=http1,
http2=http2,
)
- else: # pragma: nocover
+ else: # pragma: no cover
raise ValueError(
f"Proxy protocol must be either 'http', 'https', or 'socks5', but got {proxy.url.scheme!r}."
)
elif proxy.url.scheme == "socks5":
try:
import socksio # noqa
- except ImportError: # pragma: nocover
+ except ImportError: # pragma: no cover
raise ImportError(
"Using SOCKS proxy, but the 'socksio' package is not installed. "
"Make sure to install httpx using `pip install httpx[socks]`."
http1=http1,
http2=http2,
)
- else: # pragma: nocover
+ else: # pragma: no cover
raise ValueError(
f"Proxy protocol must be either 'http', 'https', or 'socks5', but got {proxy.url.scheme!r}."
)
def __iter__(self) -> Iterator[bytes]:
raise NotImplementedError(
"The '__iter__' method must be implemented."
- ) # pragma: nocover
- yield b"" # pragma: nocover
+ ) # pragma: no cover
+ yield b"" # pragma: no cover
def close(self) -> None:
"""
async def __aiter__(self) -> AsyncIterator[bytes]:
raise NotImplementedError(
"The '__aiter__' method must be implemented."
- ) # pragma: nocover
- yield b"" # pragma: nocover
+ ) # pragma: no cover
+ yield b"" # pragma: no cover
async def aclose(self) -> None:
pass
if expanded_path.is_file():
self._netrc_info = netrc.netrc(str(expanded_path))
break
- except (netrc.NetrcParseError, IOError): # pragma: nocover
+ except (netrc.NetrcParseError, IOError): # pragma: no cover
# Issue while reading the netrc file, ignore...
pass
return self._netrc_info
class Logger(logging.Logger):
# Stub for type checkers.
def trace(self, message: str, *args: typing.Any, **kwargs: typing.Any) -> None:
- ... # pragma: nocover
+ ... # pragma: no cover
def get_logger(name: str) -> Logger:
import trio
return trio.current_time()
- elif library == "curio": # pragma: nocover
+ elif library == "curio": # pragma: no cover
import curio
return await curio.clock()
@pytest.mark.usefixtures("async_environment")
async def test_cannot_stream_sync_request(server):
- def hello_world(): # pragma: nocover
+ def hello_world(): # pragma: no cover
yield b"Hello, "
yield b"world!"
# Once we're closed we cannot reopen the client.
with pytest.raises(RuntimeError):
async with client:
- pass # pragma: nocover
+ pass # pragma: no cover
@pytest.mark.usefixtures("async_environment")
async def __aiter__(self) -> typing.AsyncIterator[bytes]:
yield b"Hello"
raise KeyboardInterrupt()
- yield b", world" # pragma: nocover
+ yield b", world" # pragma: no cover
async def aclose(self) -> None:
nonlocal stream_was_closed
app = DigestApp()
async def streaming_body():
- yield b"Example request body" # pragma: nocover
+ yield b"Example request body" # pragma: no cover
async with httpx.AsyncClient(transport=ConsumeBodyTransport(app)) as client:
with pytest.raises(httpx.StreamConsumed):
def test_cannot_stream_async_request(server):
- async def hello_world(): # pragma: nocover
+ async def hello_world(): # pragma: no cover
yield b"Hello, "
yield b"world!"
# Once we're closed we cannot reopen the client.
with pytest.raises(RuntimeError):
with client:
- pass # pragma: nocover
+ pass # pragma: no cover
def test_client_closed_state_using_with_block():
def test_client_event_hooks():
def on_request(request):
- pass # pragma: nocover
+ pass # pragma: no cover
client = httpx.Client()
client.event_hooks = {"request": [on_request]}
url = "https://example.org/redirect_body"
def streaming_body():
- yield b"Example request body" # pragma: nocover
+ yield b"Example request body" # pragma: no cover
with pytest.raises(httpx.StreamConsumed):
client.post(url, content=streaming_body(), follow_redirects=True)
async def sleep(seconds: float) -> None:
if sniffio.current_async_library() == "trio":
- await trio.sleep(seconds) # pragma: nocover
+ await trio.sleep(seconds) # pragma: no cover
else:
await asyncio.sleep(seconds)
}
await asyncio.wait(tasks)
- async def restart(self) -> None: # pragma: nocover
+ async def restart(self) -> None: # pragma: no cover
# This coroutine may be called from a different thread than the one the
# server is running on, and from an async environment that's not asyncio.
# For this reason, we use an event to coordinate with the server
while not self.started:
await sleep(0.2)
- async def watch_restarts(self): # pragma: nocover
+ async def watch_restarts(self): # pragma: no cover
while True:
if self.should_exit:
return
def test_iterable_content():
class Content:
def __iter__(self):
- yield b"test 123" # pragma: nocover
+ yield b"test 123" # pragma: no cover
request = httpx.Request("POST", "http://example.org", content=Content())
assert request.headers == {"Host": "example.org", "Transfer-Encoding": "chunked"}
def test_generator_with_transfer_encoding_header():
def content():
- yield b"test 123" # pragma: nocover
+ yield b"test 123" # pragma: no cover
request = httpx.Request("POST", "http://example.org", content=content())
assert request.headers == {"Host": "example.org", "Transfer-Encoding": "chunked"}
def test_generator_with_content_length_header():
def content():
- yield b"test 123" # pragma: nocover
+ yield b"test 123" # pragma: no cover
headers = {"Content-Length": "8"}
request = httpx.Request(
def test_cannot_access_streaming_content_without_read():
# Ensure that streaming requests
- def streaming_body(): # pragma: nocover
+ def streaming_body(): # pragma: no cover
yield ""
request = httpx.Request("POST", "http://example.org", content=streaming_body())
def test_transfer_encoding_header():
async def streaming_body(data):
- yield data # pragma: nocover
+ yield data # pragma: no cover
data = streaming_body(b"test 123")
"""
def streaming_body(data):
- yield data # pragma: nocover
+ yield data # pragma: no cover
data = streaming_body(b"abcd")
def test_override_content_length_header():
async def streaming_body(data):
- yield data # pragma: nocover
+ yield data # pragma: no cover
data = streaming_body(b"test 123")
headers = {"Content-Length": "8"}
def test_request_generator_content_picklable():
def content():
- yield b"test 123" # pragma: nocover
+ yield b"test 123" # pragma: no cover
request = httpx.Request("POST", "http://example.org", content=content())
pickle_request = pickle.loads(pickle.dumps(request))
def test_generator_with_transfer_encoding_header():
def content():
- yield b"test 123" # pragma: nocover
+ yield b"test 123" # pragma: no cover
response = httpx.Response(200, content=content())
assert response.headers == {"Transfer-Encoding": "chunked"}
def test_generator_with_content_length_header():
def content():
- yield b"test 123" # pragma: nocover
+ yield b"test 123" # pragma: no cover
headers = {"Content-Length": "8"}
response = httpx.Response(200, content=content(), headers=headers)
reason="requires OpenSSL 1.1.1 or higher",
)
@pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8 or higher")
-def test_ssl_config_support_for_keylog_file(tmpdir, monkeypatch): # pragma: nocover
+def test_ssl_config_support_for_keylog_file(tmpdir, monkeypatch): # pragma: no cover
with monkeypatch.context() as m:
m.delenv("SSLKEYLOGFILE", raising=False)
return chunk
async def __aiter__(self):
- yield self._content # pragma: nocover
+ yield self._content # pragma: no cover
headers, stream = encode_request(content=AsyncBytesIO(b"Hello, world!"))
assert not isinstance(stream, typing.Iterable)
and value is not httpcore.ConnectionNotAvailable
]
- if not_mapped: # pragma: nocover
+ if not_mapped: # pragma: no cover
pytest.fail(f"Unmapped httpcore exceptions: {not_mapped}")
and not hasattr(httpx, name)
]
- if not_exposed: # pragma: nocover
+ if not_exposed: # pragma: no cover
pytest.fail(f"Unexposed HTTPX exceptions: {not_exposed}")