]> git.ipfire.org Git - thirdparty/httpx.git/commitdiff
Lock around stream.write and stream.close operations (#699)
authorTom Christie <tom@tomchristie.com>
Tue, 31 Dec 2019 13:18:00 +0000 (13:18 +0000)
committerGitHub <noreply@github.com>
Tue, 31 Dec 2019 13:18:00 +0000 (13:18 +0000)
httpx/backends/asyncio.py
httpx/backends/trio.py

index 1859e96a2da75a9ff2b1c53d369b1fabc09d1abe..854618e119d0b01fabeb67d422245daaabf60c2a 100644 (file)
@@ -77,6 +77,7 @@ class SocketStream(BaseSocketStream):
         self.stream_reader = stream_reader
         self.stream_writer = stream_writer
         self.read_lock = asyncio.Lock()
+        self.write_lock = asyncio.Lock()
 
         self._inner: typing.Optional[SocketStream] = None
 
@@ -135,11 +136,12 @@ class SocketStream(BaseSocketStream):
         if not data:
             return
 
-        self.stream_writer.write(data)
         try:
-            return await asyncio.wait_for(
-                self.stream_writer.drain(), timeout.write_timeout
-            )
+            async with self.write_lock:
+                self.stream_writer.write(data)
+                return await asyncio.wait_for(
+                    self.stream_writer.drain(), timeout.write_timeout
+                )
         except asyncio.TimeoutError:
             raise WriteTimeout() from None
 
@@ -168,7 +170,8 @@ class SocketStream(BaseSocketStream):
         # This is fine, though, because '.close()' schedules the actual closing of the
         # stream, meaning that at best it will happen during the next event loop
         # iteration, and at worst asyncio will take care of it on program exit.
-        self.stream_writer.close()
+        async with self.write_lock:
+            self.stream_writer.close()
 
 
 class AsyncioBackend(ConcurrencyBackend):
index 7c26dae376bf6240a38afe29528a73592e7c11e2..6e79dc8b5b08dca9c87dc63bb11283d3ad7ef115 100644 (file)
@@ -79,7 +79,8 @@ class SocketStream(BaseSocketStream):
         return stream.socket.is_readable()
 
     async def close(self) -> None:
-        await self.stream.aclose()
+        async with self.write_lock:
+            await self.stream.aclose()
 
 
 class TrioBackend(ConcurrencyBackend):