def backend(self) -> ConcurrencyBackend:
if not hasattr(self, "_backend_implementation"):
backend = sniffio.current_async_library()
- if backend not in ("asyncio", "trio"):
+ if backend not in ("asyncio", "trio"): # pragma: nocover
raise RuntimeError(f"Unsupported concurrency backend {backend!r}")
self._backend_implementation = lookup_backend(backend)
return self._backend_implementation
return typing.cast(Logger, logger)
-def kv_format(**kwargs: typing.Any) -> str:
- """Format arguments into a key=value line.
-
- >>> formatkv(x=1, name="Bob")
- "x=1 name='Bob'"
- """
- return " ".join(f"{key}={value!r}" for key, value in kwargs.items())
-
-
def should_not_be_proxied(url: "URL") -> bool:
""" Return True if url should not be proxied,
return False otherwise.
from httpx.backends.trio import TrioBackend
-@functools.singledispatch
-async def sleep(backend, seconds: int):
- raise NotImplementedError # pragma: no cover
-
-
-@sleep.register(AutoBackend)
-async def _sleep_auto(backend, seconds: int):
- return await sleep(backend.backend, seconds=seconds)
-
-
-@sleep.register(AsyncioBackend)
-async def _sleep_asyncio(backend, seconds: int):
- await asyncio.sleep(seconds)
-
-
-@sleep.register(TrioBackend)
-async def _sleep_trio(backend, seconds: int):
- await trio.sleep(seconds)
-
-
@functools.singledispatch
async def run_concurrently(backend, *coroutines: typing.Callable[[], typing.Awaitable]):
raise NotImplementedError # pragma: no cover
from httpx import Request, Timeout
from httpx.backends.base import BaseSocketStream, lookup_backend
-from tests.concurrency import sleep
class MockHTTP2Backend:
return "HTTP/2"
async def read(self, n, timeout, flag=None) -> bytes:
- await sleep(self.backend, 0)
send, self.buffer = self.buffer[:n], self.buffer[n:]
return send
self.backend.received_data.append(data)
async def read(self, n, timeout, flag=None) -> bytes:
- await sleep(self.backend.backend, 0)
if not self.backend.data_to_send:
return b""
return self.backend.data_to_send.pop(0)
body = b"test 123"
compressed_body = brotli.compress(body)[3:]
with pytest.raises(httpx.DecodingError):
- response = httpx.Response(
- 200, headers=headers, content=compressed_body, request=REQUEST
- )
- response.content
+ httpx.Response(200, headers=headers, content=compressed_body, request=REQUEST)
@pytest.mark.parametrize(