await self.init_complete.wait()
stream_id = self.state.get_next_available_stream_id()
- stream = HTTP2Stream(stream_id=stream_id, connection=self, state=self.state)
+ stream = HTTP2Stream(stream_id=stream_id, connection=self)
self.streams[stream_id] = stream
self.events[stream_id] = []
return await stream.send(request, timeout)
data_to_send = self.state.data_to_send()
await self.socket.write(data_to_send, timeout)
- async def send_outgoing_data(self, timeout: Timeout) -> None:
+ async def send_headers(
+ self,
+ stream_id: int,
+ headers: typing.List[typing.Tuple[bytes, bytes]],
+ timeout: Timeout,
+ ) -> None:
+ self.state.send_headers(stream_id, headers)
+ data_to_send = self.state.data_to_send()
+ await self.socket.write(data_to_send, timeout)
+
+ async def send_data(self, stream_id: int, chunk: bytes, timeout: Timeout) -> None:
+ self.state.send_data(stream_id, chunk)
+ data_to_send = self.state.data_to_send()
+ await self.socket.write(data_to_send, timeout)
+
+ async def end_stream(self, stream_id: int, timeout: Timeout) -> None:
+ self.state.end_stream(stream_id)
data_to_send = self.state.data_to_send()
- if data_to_send:
- await self.socket.write(data_to_send, timeout)
+ await self.socket.write(data_to_send, timeout)
+
+ async def acknowledge_received_data(
+ self, stream_id: int, amount: int, timeout: Timeout
+ ) -> None:
+ self.state.acknowledge_received_data(amount, stream_id)
+ data_to_send = self.state.data_to_send()
+ await self.socket.write(data_to_send, timeout)
async def close_stream(self, stream_id: int) -> None:
del self.streams[stream_id]
class HTTP2Stream:
- def __init__(
- self,
- stream_id: int,
- connection: HTTP2Connection,
- state: h2.connection.H2Connection,
- ) -> None:
+ def __init__(self, stream_id: int, connection: HTTP2Connection) -> None:
self.stream_id = stream_id
self.connection = connection
- self.state = state
async def send(self, request: Request, timeout: Timeout) -> Response:
# Send the request.
f"target={request.url.full_path!r} "
f"headers={headers!r}"
)
- self.state.send_headers(self.stream_id, headers)
- await self.connection.send_outgoing_data(timeout)
+ await self.connection.send_headers(self.stream_id, headers, timeout)
async def send_body(self, request: Request, timeout: Timeout) -> None:
logger.trace(f"send_body stream_id={self.stream_id}")
)
chunk_size = min(len(data), max_flow)
chunk, data = data[:chunk_size], data[chunk_size:]
- self.state.send_data(self.stream_id, chunk)
- await self.connection.send_outgoing_data(timeout)
+ await self.connection.send_data(self.stream_id, chunk, timeout)
- self.state.end_stream(self.stream_id)
- await self.connection.send_outgoing_data(timeout)
+ await self.connection.end_stream(self.stream_id, timeout)
async def receive_response(
self, timeout: Timeout
while True:
event = await self.connection.wait_for_event(self.stream_id, timeout)
if isinstance(event, h2.events.DataReceived):
- self.state.acknowledge_received_data(
- event.flow_controlled_length, self.stream_id
+ amount = event.flow_controlled_length
+ await self.connection.acknowledge_received_data(
+ self.stream_id, amount, timeout
)
- await self.connection.send_outgoing_data(timeout)
yield event.data
elif isinstance(event, (h2.events.StreamEnded, h2.events.StreamReset)):
break