From: Kim Christie Date: Mon, 13 Oct 2025 14:00:49 +0000 (+0100) Subject: Add `.wait_ready` to parser for clean server disconnects (#3690) X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ae25e86f5ca5625d5bffd13865b458ba5fadf50d;p=thirdparty%2Fhttpx.git Add `.wait_ready` to parser for clean server disconnects (#3690) --- diff --git a/src/ahttpx/_parsers.py b/src/ahttpx/_parsers.py index 371b8702..2c0f153c 100644 --- a/src/ahttpx/_parsers.py +++ b/src/ahttpx/_parsers.py @@ -224,6 +224,13 @@ class HTTPParser: # Handle body close self.send_state = State.DONE + async def wait_ready(self) -> bool: + """ + Wait until read data starts arriving, and return `True`. + Return `False` if the stream closes. + """ + return await self.parser.wait_ready() + async def recv_method_line(self) -> tuple[bytes, bytes, bytes]: """ Receive the initial request method line: @@ -453,6 +460,15 @@ class ReadAheadParser: assert self._buffer == b'' self._buffer = buffer + async def wait_ready(self) -> bool: + """ + Attempt a read, and return True if read succeeds or False if the + stream is closed. The data remains in the read buffer. + """ + data = await self._read_some() + self._push_back(data) + return data != b'' + async def read(self, size: int) -> bytes: """ Read and return up to 'size' bytes from the stream, with I/O buffering provided. diff --git a/src/ahttpx/_server.py b/src/ahttpx/_server.py index d2dafe99..219342e2 100644 --- a/src/ahttpx/_server.py +++ b/src/ahttpx/_server.py @@ -32,9 +32,14 @@ class HTTPConnection: async def handle_requests(self): try: while not self._parser.is_closed(): + if not await self._parser.wait_ready(): + # Wait until we have read data, or return + # if the stream closes. + return + # Read the initial part of the request, + # and setup a stream for reading the body. method, url, headers = await self._recv_head() stream = HTTPStream(self._recv_body, self._reset) - # TODO: Handle endpoint exceptions async with Request(method, url, headers=headers, content=stream) as request: try: response = await self._endpoint(request) @@ -50,7 +55,10 @@ class HTTPConnection: await self._send_head(response) await self._send_body(response) if self._parser.is_keepalive(): + # If the client hasn't read the request body to + # completion, then do that here. await stream.read() + # Either revert to idle, or close the connection. await self._reset() except Exception: logger.error("Internal Server Error", exc_info=True) diff --git a/src/httpx/_parsers.py b/src/httpx/_parsers.py index 1e92f534..d275a111 100644 --- a/src/httpx/_parsers.py +++ b/src/httpx/_parsers.py @@ -224,6 +224,13 @@ class HTTPParser: # Handle body close self.send_state = State.DONE + def wait_ready(self) -> bool: + """ + Wait until read data starts arriving, and return `True`. + Return `False` if the stream closes. + """ + return self.parser.wait_ready() + def recv_method_line(self) -> tuple[bytes, bytes, bytes]: """ Receive the initial request method line: @@ -453,6 +460,15 @@ class ReadAheadParser: assert self._buffer == b'' self._buffer = buffer + def wait_ready(self) -> bool: + """ + Attempt a read, and return True if read succeeds or False if the + stream is closed. The data remains in the read buffer. + """ + data = self._read_some() + self._push_back(data) + return data != b'' + def read(self, size: int) -> bytes: """ Read and return up to 'size' bytes from the stream, with I/O buffering provided. diff --git a/src/httpx/_server.py b/src/httpx/_server.py index 44ec3bff..d5a88050 100644 --- a/src/httpx/_server.py +++ b/src/httpx/_server.py @@ -32,9 +32,14 @@ class HTTPConnection: def handle_requests(self): try: while not self._parser.is_closed(): + if not self._parser.wait_ready(): + # Wait until we have read data, or return + # if the stream closes. + return + # Read the initial part of the request, + # and setup a stream for reading the body. method, url, headers = self._recv_head() stream = HTTPStream(self._recv_body, self._reset) - # TODO: Handle endpoint exceptions with Request(method, url, headers=headers, content=stream) as request: try: response = self._endpoint(request) @@ -50,7 +55,10 @@ class HTTPConnection: self._send_head(response) self._send_body(response) if self._parser.is_keepalive(): + # If the client hasn't read the request body to + # completion, then do that here. stream.read() + # Either revert to idle, or close the connection. self._reset() except Exception: logger.error("Internal Server Error", exc_info=True) @@ -102,10 +110,7 @@ class HTTPServer: def wait(self): while(True): - try: - sleep(1) - except KeyboardInterrupt: - break + sleep(1) @contextlib.contextmanager