finally:
self._loop = loop
- async def fork(
- self,
- coroutine1: typing.Callable,
- args1: typing.Sequence,
- coroutine2: typing.Callable,
- args2: typing.Sequence,
- ) -> None:
- task1 = self.loop.create_task(coroutine1(*args1))
- task2 = self.loop.create_task(coroutine2(*args2))
-
- try:
- await asyncio.gather(task1, task2)
- finally:
- pending: typing.Set[asyncio.Future[typing.Any]] # Please mypy.
- _, pending = await asyncio.wait({task1, task2}, timeout=0)
- for task in pending:
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass
-
def get_semaphore(self, limits: PoolLimits) -> BasePoolSemaphore:
return PoolSemaphore(limits)
def create_event(self) -> BaseEvent:
return self.backend.create_event()
-
- async def fork(
- self,
- coroutine1: typing.Callable,
- args1: typing.Sequence,
- coroutine2: typing.Callable,
- args2: typing.Sequence,
- ) -> None:
- return await self.backend.fork(coroutine1, args1, coroutine2, args2)
def create_event(self) -> BaseEvent:
raise NotImplementedError() # pragma: no cover
-
- async def fork(
- self,
- coroutine1: typing.Callable,
- args1: typing.Sequence,
- coroutine2: typing.Callable,
- args2: typing.Sequence,
- ) -> None:
- """
- Run two coroutines concurrently.
-
- This should start 'coroutine1' with '*args1' and 'coroutine2' with '*args2',
- and wait for them to finish.
-
- In case one of the coroutines raises an exception, cancel the other one then
- raise. If the other coroutine had also raised an exception, ignore it.
- """
- raise NotImplementedError() # pragma: no cover
functools.partial(coroutine, **kwargs) if kwargs else coroutine, *args
)
- async def fork(
- self,
- coroutine1: typing.Callable,
- args1: typing.Sequence,
- coroutine2: typing.Callable,
- args2: typing.Sequence,
- ) -> None:
- try:
- async with trio.open_nursery() as nursery:
- nursery.start_soon(coroutine1, *args1)
- nursery.start_soon(coroutine2, *args2)
- except trio.MultiError as exc:
- # In practice, we don't actually care about raising both
- # exceptions, so let's raise either indeterminantly.
- raise exc.exceptions[0]
-
def get_semaphore(self, limits: PoolLimits) -> BasePoolSemaphore:
return PoolSemaphore(limits)
from httpx.concurrency.base import lookup_backend
from httpx.concurrency.trio import TrioBackend
from httpx.config import SSLConfig
-from tests.concurrency import get_cipher, run_concurrently, sleep
+from tests.concurrency import get_cipher, run_concurrently
async def read_response(stream, timeout: Timeout, should_contain: bytes) -> bytes:
await stream.close()
-async def test_fork(backend):
- backend = lookup_backend(backend)
- ok_counter = 0
-
- async def ok(delay: int) -> None:
- nonlocal ok_counter
- await sleep(backend, delay)
- ok_counter += 1
-
- async def fail(message: str, delay: int) -> None:
- await sleep(backend, delay)
- raise RuntimeError(message)
-
- await backend.fork(ok, [0], ok, [0])
- assert ok_counter == 2
-
- with pytest.raises(RuntimeError, match="Oops"):
- await backend.fork(ok, [0], fail, ["Oops", 0.01])
-
- assert ok_counter == 3
-
- with pytest.raises(RuntimeError, match="Oops"):
- await backend.fork(ok, [0.01], fail, ["Oops", 0])
-
- assert ok_counter == 3
-
- with pytest.raises(RuntimeError, match="Oops"):
- await backend.fork(fail, ["Oops", 0.01], ok, [0])
-
- assert ok_counter == 4
-
- with pytest.raises(RuntimeError, match="Oops"):
- await backend.fork(fail, ["Oops", 0], ok, [0.01])
-
- assert ok_counter == 4
-
- with pytest.raises(RuntimeError, match="My bad"):
- await backend.fork(fail, ["My bad", 0], fail, ["Oops", 0.01])
-
- with pytest.raises(RuntimeError, match="Oops"):
- await backend.fork(fail, ["My bad", 0.01], fail, ["Oops", 0])
-
- # No 'match', since we can't know which will be raised first.
- with pytest.raises(RuntimeError):
- await backend.fork(fail, ["My bad", 0], fail, ["Oops", 0])
-
-
def test_lookup_backend():
assert isinstance(lookup_backend("asyncio"), AsyncioBackend)
assert isinstance(lookup_backend("trio"), TrioBackend)