self.stream_reader = stream_reader
self.stream_writer = stream_writer
self.read_lock = asyncio.Lock()
+ self.write_lock = asyncio.Lock()
self._inner: typing.Optional[SocketStream] = None
if not data:
return
- self.stream_writer.write(data)
try:
- return await asyncio.wait_for(
- self.stream_writer.drain(), timeout.write_timeout
- )
+ async with self.write_lock:
+ self.stream_writer.write(data)
+ return await asyncio.wait_for(
+ self.stream_writer.drain(), timeout.write_timeout
+ )
except asyncio.TimeoutError:
raise WriteTimeout() from None
# This is fine, though, because '.close()' schedules the actual closing of the
# stream, meaning that at best it will happen during the next event loop
# iteration, and at worst asyncio will take care of it on program exit.
- self.stream_writer.close()
+ async with self.write_lock:
+ self.stream_writer.close()
class AsyncioBackend(ConcurrencyBackend):
return stream.socket.is_readable()
async def close(self) -> None:
- await self.stream.aclose()
+ async with self.write_lock:
+ await self.stream.aclose()
class TrioBackend(ConcurrencyBackend):