self.stream_reader = stream_reader
self.stream_writer = stream_writer
self.timeout = timeout
+ self.read_lock = asyncio.Lock()
self._inner: typing.Optional[SocketStream] = None
should_raise = flag is None or flag.raise_on_read_timeout
read_timeout = timeout.read_timeout if should_raise else 0.01
try:
- data = await asyncio.wait_for(self.stream_reader.read(n), read_timeout)
- break
+ async with self.read_lock:
+ data = await asyncio.wait_for(
+ self.stream_reader.read(n), read_timeout
+ )
except asyncio.TimeoutError:
if should_raise:
raise ReadTimeout() from None
# doesn't seem to allow on 3.6.
# See: https://github.com/encode/httpx/issues/382
await asyncio.sleep(0)
+ else:
+ break
return data
self.stream = stream
self.timeout = timeout
self.write_buffer = b""
+ self.read_lock = trio.Lock()
self.write_lock = trio.Lock()
async def start_tls(
read_timeout = _or_inf(timeout.read_timeout if should_raise else 0.01)
with trio.move_on_after(read_timeout):
- return await self.stream.receive_some(max_bytes=n)
+ async with self.read_lock:
+ return await self.stream.receive_some(max_bytes=n)
if should_raise:
raise ReadTimeout() from None
import asyncio
import functools
+import typing
+
+import trio
from httpx import AsyncioBackend
+from httpx.concurrency.trio import TrioBackend
@functools.singledispatch
await asyncio.sleep(seconds)
-try:
- import trio
- from httpx.concurrency.trio import TrioBackend
-except ImportError: # pragma: no cover
- pass
-else:
+@sleep.register(TrioBackend)
+async def _sleep_trio(backend, seconds: int):
+ await trio.sleep(seconds)
+
+
+@functools.singledispatch
+async def run_concurrently(backend, *coroutines: typing.Callable[[], typing.Awaitable]):
+ raise NotImplementedError # pragma: no cover
+
+
+@run_concurrently.register(AsyncioBackend)
+async def _run_concurrently_asyncio(backend, *coroutines):
+ coros = (coroutine() for coroutine in coroutines)
+ await asyncio.gather(*coros)
+
- @sleep.register(TrioBackend)
- async def _sleep_trio(backend, seconds: int):
- await trio.sleep(seconds)
+@run_concurrently.register(TrioBackend)
+async def _run_concurrently_trio(backend, *coroutines):
+ async with trio.open_nursery() as nursery:
+ for coroutine in coroutines:
+ nursery.start_soon(coroutine)
from httpx import AsyncioBackend, HTTPVersionConfig, SSLConfig, TimeoutConfig
from httpx.concurrency.trio import TrioBackend
+from tests.concurrency import run_concurrently
@pytest.mark.parametrize(
finally:
await stream.close()
+
+
+async def test_concurrent_read(server, backend):
+ """
+ Regression test for: https://github.com/encode/httpx/issues/527
+ """
+ stream = await backend.open_tcp_stream(
+ server.url.host, server.url.port, ssl_context=None, timeout=TimeoutConfig(5)
+ )
+ try:
+ await stream.write(b"GET / HTTP/1.1\r\n\r\n")
+ await run_concurrently(
+ backend, lambda: stream.read(10), lambda: stream.read(10)
+ )
+ finally:
+ await stream.close()