]> git.ipfire.org Git - thirdparty/httpx.git/commitdiff
Add stream ID to HTTP2Connection
authorTom Christie <tom@tomchristie.com>
Wed, 24 Apr 2019 16:04:15 +0000 (17:04 +0100)
committerTom Christie <tom@tomchristie.com>
Wed, 24 Apr 2019 16:04:15 +0000 (17:04 +0100)
httpcore/__init__.py
httpcore/connection.py
httpcore/connection_pool.py [moved from httpcore/connectionpool.py with 98% similarity]
httpcore/http11.py
httpcore/http2.py
httpcore/models.py [moved from httpcore/datastructures.py with 100% similarity]
httpcore/sync.py

index 48e3426ac46faa2cff68c6f26657136821adcb79..738d7ecd76a002fa15de1319694d0fe99873fdd6 100644 (file)
@@ -1,7 +1,6 @@
 from .config import PoolLimits, SSLConfig, TimeoutConfig
 from .connection import HTTPConnection
-from .connectionpool import ConnectionPool
-from .datastructures import URL, Origin, Request, Response
+from .connection_pool import ConnectionPool
 from .exceptions import (
     ConnectTimeout,
     PoolTimeout,
@@ -13,6 +12,7 @@ from .exceptions import (
 )
 from .http2 import HTTP2Connection
 from .http11 import HTTP11Connection
+from .models import URL, Origin, Request, Response
 from .sync import SyncClient, SyncConnectionPool
 
 __version__ = "0.2.1"
index 33f34594882406536bb0468f5964f853b7d4b7cb..db3a17e6c3ba5bd0d9e61dfe8ccade7933971ba0 100644 (file)
@@ -5,10 +5,10 @@ import h2.connection
 import h11
 
 from .config import DEFAULT_SSL_CONFIG, DEFAULT_TIMEOUT_CONFIG, SSLConfig, TimeoutConfig
-from .datastructures import Client, Origin, Request, Response
 from .exceptions import ConnectTimeout
 from .http2 import HTTP2Connection
 from .http11 import HTTP11Connection
+from .models import Client, Origin, Request, Response
 
 
 class HTTPConnection(Client):
similarity index 98%
rename from httpcore/connectionpool.py
rename to httpcore/connection_pool.py
index 54bb32f5916f709935eb28f7791378aead5ca4dc..6ec30289912e00028cf6f7735f0794685d4ac2a1 100644 (file)
@@ -11,8 +11,8 @@ from .config import (
     TimeoutConfig,
 )
 from .connection import HTTPConnection
-from .datastructures import Client, Origin, Request, Response
 from .exceptions import PoolTimeout
+from .models import Client, Origin, Request, Response
 
 
 class ConnectionPool(Client):
index f660867abc3275a1341062c9b8d20a8179cf2bc6..45994164ffdda3a2fa13dc3a1f8d74533db5a4d0 100644 (file)
@@ -4,8 +4,8 @@ import typing
 import h11
 
 from .config import DEFAULT_SSL_CONFIG, DEFAULT_TIMEOUT_CONFIG, SSLConfig, TimeoutConfig
-from .datastructures import Client, Origin, Request, Response
 from .exceptions import ConnectTimeout, ReadTimeout
+from .models import Client, Origin, Request, Response
 
 H11Event = typing.Union[
     h11.Request,
index 084a87ede466600e534bccfcf3438f0f9a31cbb4..0b32bc159204cd0aae8b0b9f682902eb9dc0d40c 100644 (file)
@@ -5,8 +5,8 @@ import h2.connection
 import h2.events
 
 from .config import DEFAULT_SSL_CONFIG, DEFAULT_TIMEOUT_CONFIG, SSLConfig, TimeoutConfig
-from .datastructures import Client, Origin, Request, Response
 from .exceptions import ConnectTimeout, ReadTimeout
+from .models import Client, Origin, Request, Response
 
 
 class HTTP2Connection(Client):
@@ -24,7 +24,8 @@ class HTTP2Connection(Client):
         self.timeout = timeout
         self.on_release = on_release
         self.h2_state = h2.connection.H2Connection()
-        self.events = []  # type: typing.List[h2.events.Event]
+        self.events = {}  # type: typing.Dict[int, typing.List[h2.events.Event]]
+        self.initialized = False
 
     @property
     def is_closed(self) -> bool:
@@ -40,20 +41,24 @@ class HTTP2Connection(Client):
         if timeout is None:
             timeout = self.timeout
 
+        if not self.initialized:
+            self.initiate_connection()
+
         #  Start sending the request.
-        await self._initiate_connection()
-        await self._send_headers(request)
+        stream_id = self.h2_state.get_next_available_stream_id()
+        self.events[stream_id] = []
+        await self.send_headers(stream_id, request)
 
         # Send the request body.
-        if request.body:
-            await self._send_data(request.body)
+        async for data in request.stream():
+            await self.send_data(stream_id, data)
 
         # Finalize sending the request.
-        await self._end_stream()
+        await self.end_stream(stream_id)
 
         # Start getting the response.
         while True:
-            event = await self._receive_event(timeout)
+            event = await self.receive_event(stream_id, timeout)
             if isinstance(event, h2.events.ResponseReceived):
                 break
 
@@ -65,51 +70,57 @@ class HTTP2Connection(Client):
             elif not k.startswith(b":"):
                 headers.append((k, v))
 
-        body = self._body_iter(timeout)
+        body = self.body_iter(stream_id, timeout)
         return Response(
             status_code=status_code,
             protocol="HTTP/2",
             headers=headers,
             body=body,
-            on_close=self._release,
+            on_close=self.release,
         )
 
-    async def _initiate_connection(self) -> None:
+    def initiate_connection(self) -> None:
         self.h2_state.initiate_connection()
         data_to_send = self.h2_state.data_to_send()
         self.writer.write(data_to_send)
+        self.initialized = True
 
-    async def _send_headers(self, request: Request) -> None:
+    async def send_headers(self, stream_id: int, request: Request) -> None:
         headers = [
             (b":method", request.method.encode()),
             (b":authority", request.url.hostname.encode()),
             (b":scheme", request.url.scheme.encode()),
             (b":path", request.url.full_path.encode()),
         ] + request.headers
-        self.h2_state.send_headers(1, headers)
+        self.h2_state.send_headers(stream_id, headers)
         data_to_send = self.h2_state.data_to_send()
         self.writer.write(data_to_send)
 
-    async def _send_data(self, data: bytes) -> None:
-        self.h2_state.send_data(1, data)
+    async def send_data(self, stream_id: int, data: bytes) -> None:
+        self.h2_state.send_data(stream_id, data)
         data_to_send = self.h2_state.data_to_send()
         self.writer.write(data_to_send)
 
-    async def _end_stream(self) -> None:
-        self.h2_state.end_stream(1)
+    async def end_stream(self, stream_id: int) -> None:
+        self.h2_state.end_stream(stream_id)
         data_to_send = self.h2_state.data_to_send()
         self.writer.write(data_to_send)
 
-    async def _body_iter(self, timeout: TimeoutConfig) -> typing.AsyncIterator[bytes]:
+    async def body_iter(
+        self, stream_id: int, timeout: TimeoutConfig
+    ) -> typing.AsyncIterator[bytes]:
         while True:
-            event = await self._receive_event(timeout)
+            event = await self.receive_event(stream_id, timeout)
             if isinstance(event, h2.events.DataReceived):
                 yield event.data
             elif isinstance(event, h2.events.StreamEnded):
+                del self.events[stream_id]
                 break
 
-    async def _receive_event(self, timeout: TimeoutConfig) -> h2.events.Event:
-        while not self.events:
+    async def receive_event(
+        self, stream_id: int, timeout: TimeoutConfig
+    ) -> h2.events.Event:
+        while not self.events[stream_id]:
             try:
                 data = await asyncio.wait_for(
                     self.reader.read(2048), timeout.read_timeout
@@ -118,35 +129,19 @@ class HTTP2Connection(Client):
                 raise ReadTimeout()
 
             events = self.h2_state.receive_data(data)
-            self.events.extend(events)
+            for event in events:
+                if getattr(event, "stream_id", 0):
+                    self.events[event.stream_id].append(event)
 
             data_to_send = self.h2_state.data_to_send()
             if data_to_send:
                 self.writer.write(data_to_send)
 
-        return self.events.pop(0)
-
-    async def _release(self) -> None:
-        # if (
-        #     self.h11_state.our_state is h11.DONE
-        #     and self.h11_state.their_state is h11.DONE
-        # ):
-        #     self.h11_state.start_next_cycle()
-        # else:
-        #     await self.close()
+        return self.events[stream_id].pop(0)
 
+    async def release(self) -> None:
         if self.on_release is not None:
             await self.on_release(self)
 
     async def close(self) -> None:
-        # event = h11.ConnectionClosed()
-        # try:
-        #     # If we're in h11.MUST_CLOSE then we'll end up in h11.CLOSED.
-        #     self.h11_state.send(event)
-        # except h11.ProtocolError:
-        #     # If we're in some other state then it's a premature close,
-        #     # and we'll end up in h11.ERROR.
-        #     pass
-
-        if self.writer is not None:
-            self.writer.close()
+        self.writer.close()
index 1d560faf91b2c31498d13071221d5b989f73e4a6..b1f98f50e128ff83cd69b1fb3bc2dc1b48a552c7 100644 (file)
@@ -3,8 +3,8 @@ import typing
 from types import TracebackType
 
 from .config import SSLConfig, TimeoutConfig
-from .connectionpool import ConnectionPool
-from .datastructures import URL, Client, Response
+from .connection_pool import ConnectionPool
+from .models import URL, Client, Response
 
 
 class SyncResponse: