From: Kim Christie Date: Fri, 19 Sep 2025 11:03:13 +0000 (+0100) Subject: Server connection handling. (#3672) X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=68989ae47d9fd10381a3859619a3880014d187f6;p=thirdparty%2Fhttpx.git Server connection handling. (#3672) --- diff --git a/src/ahttpx/_parsers.py b/src/ahttpx/_parsers.py index 8a52a56f..371b8702 100644 --- a/src/ahttpx/_parsers.py +++ b/src/ahttpx/_parsers.py @@ -375,13 +375,13 @@ class HTTPParser: self.recv_state = State.DONE return body - async def complete(self): + async def reset(self) -> bool: is_fully_complete = self.send_state == State.DONE and self.recv_state == State.DONE is_keepalive = self.send_keep_alive and self.recv_keep_alive if not (is_fully_complete and is_keepalive): await self.close() - return + return False if self.mode == Mode.CLIENT: self.send_state = State.SEND_METHOD_LINE @@ -397,6 +397,7 @@ class HTTPParser: self.send_keep_alive = True self.recv_keep_alive = True self.processing_1xx = False + return True async def close(self): if self.send_state != State.CLOSED: @@ -404,6 +405,13 @@ class HTTPParser: self.recv_state = State.CLOSED await self.stream.close() + def is_keepalive(self) -> bool: + return ( + self.send_keep_alive and + self.recv_keep_alive and + self.send_state != State.CLOSED + ) + def is_idle(self) -> bool: return ( self.send_state == State.SEND_METHOD_LINE or diff --git a/src/ahttpx/_pool.py b/src/ahttpx/_pool.py index f712cfac..4a9f787b 100644 --- a/src/ahttpx/_pool.py +++ b/src/ahttpx/_pool.py @@ -170,7 +170,7 @@ class Connection(Transport): await self._send_head(request) await self._send_body(request) code, headers = await self._recv_head() - stream = HTTPStream(self._recv_body, self._complete) + stream = HTTPStream(self._recv_body, self._reset) # TODO... return Response(code, headers=headers, content=stream) # finally: @@ -235,9 +235,9 @@ class Connection(Transport): async def _recv_body(self) -> bytes: return await self._parser.recv_body() - # Request/response cycle complete... - async def _complete(self) -> None: - await self._parser.complete() + # Request/response cycle reset... + async def _reset(self) -> None: + await self._parser.reset() self._idle_expiry = time.monotonic() + self._keepalive_duration async def _close(self) -> None: diff --git a/src/ahttpx/_server.py b/src/ahttpx/_server.py index a9103cc9..d2dafe99 100644 --- a/src/ahttpx/_server.py +++ b/src/ahttpx/_server.py @@ -33,7 +33,7 @@ class HTTPConnection: try: while not self._parser.is_closed(): method, url, headers = await self._recv_head() - stream = HTTPStream(self._recv_body, self._complete) + stream = HTTPStream(self._recv_body, self._reset) # TODO: Handle endpoint exceptions async with Request(method, url, headers=headers, content=stream) as request: try: @@ -43,12 +43,15 @@ class HTTPConnection: except Exception: logger.error("Internal Server Error", exc_info=True) content = Text("Internal Server Error") - err = Response(code=500, content=content) + err = Response(500, content=content) await self._send_head(err) await self._send_body(err) else: await self._send_head(response) await self._send_body(response) + if self._parser.is_keepalive(): + await stream.read() + await self._reset() except Exception: logger.error("Internal Server Error", exc_info=True) @@ -88,8 +91,8 @@ class HTTPConnection: await self._parser.send_body(b'') # Start it all over again... - async def _complete(self): - await self._parser.complete + async def _reset(self): + await self._parser.reset() self._idle_expiry = time.monotonic() + self._keepalive_duration diff --git a/src/httpx/_network.py b/src/httpx/_network.py index 5ea9bb54..a204d206 100644 --- a/src/httpx/_network.py +++ b/src/httpx/_network.py @@ -83,6 +83,9 @@ class NetworkStream(Stream): self._is_closed = True self._socket.close() + def is_closed(self) -> bool: + return self._is_closed + def __repr__(self): description = "" description += " TLS" if self._is_tls else "" @@ -160,7 +163,7 @@ class NetworkServer: self._max_workers = 5 self._executor = None self._thread = None - self._streams = list[NetworkStream] + self._streams: list[NetworkStream] = [] @property def host(self): @@ -177,11 +180,18 @@ class NetworkServer: def __exit__(self, exc_type, exc_val, exc_tb): self.listener.close() + for stream in self._streams: + stream.close() self._executor.shutdown(wait=True) def _serve(self): while stream := self.listener.accept(): self._executor.submit(self._handler, stream) + self._streams = [ + stream for stream in self._streams + if not stream.is_closed() + ] + self._streams.append(stream) def _handler(self, stream): try: diff --git a/src/httpx/_parsers.py b/src/httpx/_parsers.py index 830fccd9..1e92f534 100644 --- a/src/httpx/_parsers.py +++ b/src/httpx/_parsers.py @@ -375,13 +375,13 @@ class HTTPParser: self.recv_state = State.DONE return body - def complete(self): + def reset(self) -> bool: is_fully_complete = self.send_state == State.DONE and self.recv_state == State.DONE is_keepalive = self.send_keep_alive and self.recv_keep_alive if not (is_fully_complete and is_keepalive): self.close() - return + return False if self.mode == Mode.CLIENT: self.send_state = State.SEND_METHOD_LINE @@ -397,6 +397,7 @@ class HTTPParser: self.send_keep_alive = True self.recv_keep_alive = True self.processing_1xx = False + return True def close(self): if self.send_state != State.CLOSED: @@ -404,6 +405,13 @@ class HTTPParser: self.recv_state = State.CLOSED self.stream.close() + def is_keepalive(self) -> bool: + return ( + self.send_keep_alive and + self.recv_keep_alive and + self.send_state != State.CLOSED + ) + def is_idle(self) -> bool: return ( self.send_state == State.SEND_METHOD_LINE or diff --git a/src/httpx/_pool.py b/src/httpx/_pool.py index 7193f8d8..b56f29ea 100644 --- a/src/httpx/_pool.py +++ b/src/httpx/_pool.py @@ -170,7 +170,7 @@ class Connection(Transport): self._send_head(request) self._send_body(request) code, headers = self._recv_head() - stream = HTTPStream(self._recv_body, self._complete) + stream = HTTPStream(self._recv_body, self._reset) # TODO... return Response(code, headers=headers, content=stream) # finally: @@ -235,9 +235,9 @@ class Connection(Transport): def _recv_body(self) -> bytes: return self._parser.recv_body() - # Request/response cycle complete... - def _complete(self) -> None: - self._parser.complete() + # Request/response cycle reset... + def _reset(self) -> None: + self._parser.reset() self._idle_expiry = time.monotonic() + self._keepalive_duration def _close(self) -> None: diff --git a/src/httpx/_server.py b/src/httpx/_server.py index 95226d99..44ec3bff 100644 --- a/src/httpx/_server.py +++ b/src/httpx/_server.py @@ -33,7 +33,7 @@ class HTTPConnection: try: while not self._parser.is_closed(): method, url, headers = self._recv_head() - stream = HTTPStream(self._recv_body, self._complete) + stream = HTTPStream(self._recv_body, self._reset) # TODO: Handle endpoint exceptions with Request(method, url, headers=headers, content=stream) as request: try: @@ -43,12 +43,15 @@ class HTTPConnection: except Exception: logger.error("Internal Server Error", exc_info=True) content = Text("Internal Server Error") - err = Response(code=500, content=content) + err = Response(500, content=content) self._send_head(err) self._send_body(err) else: self._send_head(response) self._send_body(response) + if self._parser.is_keepalive(): + stream.read() + self._reset() except Exception: logger.error("Internal Server Error", exc_info=True) @@ -88,8 +91,8 @@ class HTTPConnection: self._parser.send_body(b'') # Start it all over again... - def _complete(self): - self._parser.complete + def _reset(self): + self._parser.reset() self._idle_expiry = time.monotonic() + self._keepalive_duration @@ -99,7 +102,10 @@ class HTTPServer: def wait(self): while(True): - sleep(1) + try: + sleep(1) + except KeyboardInterrupt: + break @contextlib.contextmanager diff --git a/tests/test_parsers.py b/tests/test_parsers.py index e2a321e2..32b20deb 100644 --- a/tests/test_parsers.py +++ b/tests/test_parsers.py @@ -67,7 +67,7 @@ def test_parser(): assert terminator == b'' assert not p.is_idle() - p.complete() + p.reset() assert p.is_idle() @@ -113,7 +113,7 @@ def test_parser_server(): ) assert not p.is_idle() - p.complete() + p.reset() assert p.is_idle() @@ -315,7 +315,7 @@ def test_parser_repr(): p.recv_body() assert repr(p) == "" - p.complete() + p.reset() assert repr(p) == "" @@ -554,7 +554,7 @@ def test_client_connection_close(): assert repr(p) == "" - p.complete() + p.reset() assert repr(p) == "" assert p.is_closed() @@ -591,7 +591,7 @@ def test_server_connection_close(): assert terminator == b"" assert repr(p) == "" - p.complete() + p.reset() assert repr(p) == ""