]> git.ipfire.org Git - thirdparty/httpx.git/commitdiff
Server connection handling. (#3672)
authorKim Christie <kim@encode.io>
Fri, 19 Sep 2025 11:03:13 +0000 (12:03 +0100)
committerGitHub <noreply@github.com>
Fri, 19 Sep 2025 11:03:13 +0000 (12:03 +0100)
src/ahttpx/_parsers.py
src/ahttpx/_pool.py
src/ahttpx/_server.py
src/httpx/_network.py
src/httpx/_parsers.py
src/httpx/_pool.py
src/httpx/_server.py
tests/test_parsers.py

index 8a52a56fdf2349489bdfe57fd9e5c1c8f2d06257..371b870201be415a01f17ff66bc23cc63e082ea9 100644 (file)
@@ -375,13 +375,13 @@ class HTTPParser:
             self.recv_state = State.DONE
         return body
 
-    async def complete(self):
+    async def reset(self) -> bool:
         is_fully_complete = self.send_state == State.DONE and self.recv_state == State.DONE
         is_keepalive = self.send_keep_alive and self.recv_keep_alive
 
         if not (is_fully_complete and is_keepalive):
             await self.close()
-            return
+            return False
 
         if self.mode == Mode.CLIENT:
             self.send_state = State.SEND_METHOD_LINE
@@ -397,6 +397,7 @@ class HTTPParser:
         self.send_keep_alive = True
         self.recv_keep_alive = True
         self.processing_1xx = False
+        return True
 
     async def close(self):
         if self.send_state != State.CLOSED:
@@ -404,6 +405,13 @@ class HTTPParser:
             self.recv_state = State.CLOSED
             await self.stream.close()
 
+    def is_keepalive(self) -> bool:
+        return (
+            self.send_keep_alive and
+            self.recv_keep_alive and
+            self.send_state != State.CLOSED
+        )
+
     def is_idle(self) -> bool:
         return (
             self.send_state == State.SEND_METHOD_LINE or
index f712cfac277159ac4bf5bc68772aeb3dc6dfee91..4a9f787b525cc0da629746e19094cf558f4eee9a 100644 (file)
@@ -170,7 +170,7 @@ class Connection(Transport):
         await self._send_head(request)
         await self._send_body(request)
         code, headers = await self._recv_head()
-        stream = HTTPStream(self._recv_body, self._complete)
+        stream = HTTPStream(self._recv_body, self._reset)
         # TODO...
         return Response(code, headers=headers, content=stream)
         #    finally:
@@ -235,9 +235,9 @@ class Connection(Transport):
     async def _recv_body(self) -> bytes:
         return await self._parser.recv_body()
 
-    # Request/response cycle complete...
-    async def _complete(self) -> None:
-        await self._parser.complete()
+    # Request/response cycle reset...
+    async def _reset(self) -> None:
+        await self._parser.reset()
         self._idle_expiry = time.monotonic() + self._keepalive_duration
 
     async def _close(self) -> None:
index a9103cc97f69e2b3a56df6b8d95f4c88d0d4837e..d2dafe99f7b96474ab46e27d75476172db66e7af 100644 (file)
@@ -33,7 +33,7 @@ class HTTPConnection:
         try:
             while not self._parser.is_closed():
                 method, url, headers = await self._recv_head()
-                stream = HTTPStream(self._recv_body, self._complete)
+                stream = HTTPStream(self._recv_body, self._reset)
                 # TODO: Handle endpoint exceptions
                 async with Request(method, url, headers=headers, content=stream) as request:
                     try:
@@ -43,12 +43,15 @@ class HTTPConnection:
                     except Exception:
                         logger.error("Internal Server Error", exc_info=True)
                         content = Text("Internal Server Error")
-                        err = Response(code=500, content=content)
+                        err = Response(500, content=content)
                         await self._send_head(err)
                         await self._send_body(err)
                     else:
                         await self._send_head(response)
                         await self._send_body(response)
+                if self._parser.is_keepalive():
+                    await stream.read()
+                await self._reset()
         except Exception:
             logger.error("Internal Server Error", exc_info=True)
 
@@ -88,8 +91,8 @@ class HTTPConnection:
         await self._parser.send_body(b'')
 
     # Start it all over again...
-    async def _complete(self):
-        await self._parser.complete
+    async def _reset(self):
+        await self._parser.reset()
         self._idle_expiry = time.monotonic() + self._keepalive_duration
 
 
index 5ea9bb5472377641bf6515a04718494b91e87546..a204d2068b69acf1e277eb450cfdfa81f549ff7c 100644 (file)
@@ -83,6 +83,9 @@ class NetworkStream(Stream):
             self._is_closed = True
             self._socket.close()
 
+    def is_closed(self) -> bool:
+        return self._is_closed
+
     def __repr__(self):
         description = ""
         description += " TLS" if self._is_tls else ""
@@ -160,7 +163,7 @@ class NetworkServer:
         self._max_workers = 5
         self._executor = None
         self._thread = None
-        self._streams = list[NetworkStream]
+        self._streams: list[NetworkStream] = []
 
     @property
     def host(self):
@@ -177,11 +180,18 @@ class NetworkServer:
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         self.listener.close()
+        for stream in self._streams:
+            stream.close()
         self._executor.shutdown(wait=True)
 
     def _serve(self):
         while stream := self.listener.accept():
             self._executor.submit(self._handler, stream)
+            self._streams = [
+                stream for stream in self._streams
+                if not stream.is_closed()
+            ]
+            self._streams.append(stream)
 
     def _handler(self, stream):
         try:
index 830fccd90188f19a2f2c373510d396c409728e26..1e92f5347c6b25ec2d797cec146e67e293337981 100644 (file)
@@ -375,13 +375,13 @@ class HTTPParser:
             self.recv_state = State.DONE
         return body
 
-    def complete(self):
+    def reset(self) -> bool:
         is_fully_complete = self.send_state == State.DONE and self.recv_state == State.DONE
         is_keepalive = self.send_keep_alive and self.recv_keep_alive
 
         if not (is_fully_complete and is_keepalive):
             self.close()
-            return
+            return False
 
         if self.mode == Mode.CLIENT:
             self.send_state = State.SEND_METHOD_LINE
@@ -397,6 +397,7 @@ class HTTPParser:
         self.send_keep_alive = True
         self.recv_keep_alive = True
         self.processing_1xx = False
+        return True
 
     def close(self):
         if self.send_state != State.CLOSED:
@@ -404,6 +405,13 @@ class HTTPParser:
             self.recv_state = State.CLOSED
             self.stream.close()
 
+    def is_keepalive(self) -> bool:
+        return (
+            self.send_keep_alive and
+            self.recv_keep_alive and
+            self.send_state != State.CLOSED
+        )
+
     def is_idle(self) -> bool:
         return (
             self.send_state == State.SEND_METHOD_LINE or
index 7193f8d81c06725acd0a0e19b83f7d0074b6c214..b56f29eaad67a37686ae468351a10459ed4445fb 100644 (file)
@@ -170,7 +170,7 @@ class Connection(Transport):
         self._send_head(request)
         self._send_body(request)
         code, headers = self._recv_head()
-        stream = HTTPStream(self._recv_body, self._complete)
+        stream = HTTPStream(self._recv_body, self._reset)
         # TODO...
         return Response(code, headers=headers, content=stream)
         #    finally:
@@ -235,9 +235,9 @@ class Connection(Transport):
     def _recv_body(self) -> bytes:
         return self._parser.recv_body()
 
-    # Request/response cycle complete...
-    def _complete(self) -> None:
-        self._parser.complete()
+    # Request/response cycle reset...
+    def _reset(self) -> None:
+        self._parser.reset()
         self._idle_expiry = time.monotonic() + self._keepalive_duration
 
     def _close(self) -> None:
index 95226d991415651bf33dc1aba4af1985f9b1a484..44ec3bff3e49700d3027a70a4ce3420c580e8ca6 100644 (file)
@@ -33,7 +33,7 @@ class HTTPConnection:
         try:
             while not self._parser.is_closed():
                 method, url, headers = self._recv_head()
-                stream = HTTPStream(self._recv_body, self._complete)
+                stream = HTTPStream(self._recv_body, self._reset)
                 # TODO: Handle endpoint exceptions
                 with Request(method, url, headers=headers, content=stream) as request:
                     try:
@@ -43,12 +43,15 @@ class HTTPConnection:
                     except Exception:
                         logger.error("Internal Server Error", exc_info=True)
                         content = Text("Internal Server Error")
-                        err = Response(code=500, content=content)
+                        err = Response(500, content=content)
                         self._send_head(err)
                         self._send_body(err)
                     else:
                         self._send_head(response)
                         self._send_body(response)
+                if self._parser.is_keepalive():
+                    stream.read()
+                self._reset()
         except Exception:
             logger.error("Internal Server Error", exc_info=True)
 
@@ -88,8 +91,8 @@ class HTTPConnection:
         self._parser.send_body(b'')
 
     # Start it all over again...
-    def _complete(self):
-        self._parser.complete
+    def _reset(self):
+        self._parser.reset()
         self._idle_expiry = time.monotonic() + self._keepalive_duration
 
 
@@ -99,7 +102,10 @@ class HTTPServer:
 
     def wait(self):
         while(True):
-            sleep(1)
+            try:
+                sleep(1)
+            except KeyboardInterrupt:
+                break
 
 
 @contextlib.contextmanager
index e2a321e2a2d68902c5f4a6e8f698666d137ebe89..32b20deba48535b4f550cbfc8af76299c200c8f0 100644 (file)
@@ -67,7 +67,7 @@ def test_parser():
     assert terminator == b''
 
     assert not p.is_idle()
-    p.complete()
+    p.reset()
     assert p.is_idle()
 
 
@@ -113,7 +113,7 @@ def test_parser_server():
     )
 
     assert not p.is_idle()
-    p.complete()
+    p.reset()
     assert p.is_idle()
 
 
@@ -315,7 +315,7 @@ def test_parser_repr():
     p.recv_body()
     assert repr(p) == "<HTTPParser [client DONE, server DONE]>"
 
-    p.complete()
+    p.reset()
     assert repr(p) == "<HTTPParser [client SEND_METHOD_LINE, server WAIT]>"
 
 
@@ -554,7 +554,7 @@ def test_client_connection_close():
 
     assert repr(p) == "<HTTPParser [client DONE, server DONE]>"
 
-    p.complete()
+    p.reset()
     assert repr(p) == "<HTTPParser [client CLOSED, server CLOSED]>"
     assert p.is_closed()
 
@@ -591,7 +591,7 @@ def test_server_connection_close():
     assert terminator == b""
 
     assert repr(p) == "<HTTPParser [client DONE, server DONE]>"
-    p.complete()
+    p.reset()
     assert repr(p) == "<HTTPParser [client CLOSED, server CLOSED]>"