def __init__(
self,
- stream: BaseSocketStream,
+ socket: BaseSocketStream,
on_release: typing.Optional[OnReleaseCallback] = None,
):
- self.stream = stream
+ self.socket = socket
self.on_release = on_release
self.h11_state = h11.Connection(our_role=h11.CLIENT)
self.timeout_flag = TimeoutFlag()
except h11.LocalProtocolError: # pragma: no cover
# Premature client disconnect
pass
- await self.stream.close()
+ await self.socket.close()
async def _send_request(self, request: Request, timeout: Timeout) -> None:
"""
drain before returning.
"""
bytes_to_send = self.h11_state.send(event)
- await self.stream.write(bytes_to_send, timeout)
+ await self.socket.write(bytes_to_send, timeout)
async def _receive_response(
self, timeout: Timeout
+ f"their_state={self.h11_state.their_state} "
+ f"error_status_hint={e.error_status_hint}"
)
- if self.stream.is_connection_dropped():
+ if self.socket.is_connection_dropped():
raise ConnectionClosed(e)
raise ProtocolError(e)
if event is h11.NEED_DATA:
try:
- data = await self.stream.read(
+ data = await self.socket.read(
self.READ_NUM_BYTES, timeout, flag=self.timeout_flag
)
except OSError: # pragma: nocover
return self.h11_state.our_state in (h11.CLOSED, h11.ERROR)
def is_connection_dropped(self) -> bool:
- return self.stream.is_connection_dropped()
+ return self.socket.is_connection_dropped()
def __init__(
self,
- stream: BaseSocketStream,
+ socket: BaseSocketStream,
backend: typing.Union[str, ConcurrencyBackend] = "auto",
on_release: typing.Callable = None,
):
- self.stream = stream
+ self.socket = socket
self.backend = lookup_backend(backend)
self.on_release = on_release
self.h2_state = h2.connection.H2Connection()
)
async def close(self) -> None:
- await self.stream.close()
+ await self.socket.close()
async def send_connection_init(self, timeout: Timeout) -> None:
# Need to set these manually here instead of manipulating via
self.h2_state.initiate_connection()
data_to_send = self.h2_state.data_to_send()
- await self.stream.write(data_to_send, timeout)
+ await self.socket.write(data_to_send, timeout)
async def send_headers(self, request: Request, timeout: Timeout) -> int:
stream_id = self.h2_state.get_next_available_stream_id()
)
self.h2_state.send_headers(stream_id, headers)
data_to_send = self.h2_state.data_to_send()
- await self.stream.write(data_to_send, timeout)
+ await self.socket.write(data_to_send, timeout)
return stream_id
async def send_request_data(
chunk, data = data[:chunk_size], data[chunk_size:]
self.h2_state.send_data(stream_id, chunk)
data_to_send = self.h2_state.data_to_send()
- await self.stream.write(data_to_send, timeout)
+ await self.socket.write(data_to_send, timeout)
async def end_stream(self, stream_id: int, timeout: Timeout) -> None:
logger.trace(f"end_stream stream_id={stream_id}")
self.h2_state.end_stream(stream_id)
data_to_send = self.h2_state.data_to_send()
- await self.stream.write(data_to_send, timeout)
+ await self.socket.write(data_to_send, timeout)
async def receive_response(
self, stream_id: int, timeout: Timeout
async def receive_event(self, stream_id: int, timeout: Timeout) -> h2.events.Event:
while not self.events[stream_id]:
flag = self.timeout_flags[stream_id]
- data = await self.stream.read(self.READ_NUM_BYTES, timeout, flag=flag)
+ data = await self.socket.read(self.READ_NUM_BYTES, timeout, flag=flag)
events = self.h2_state.receive_data(data)
for event in events:
event_stream_id = getattr(event, "stream_id", 0)
self.events[event.stream_id].append(event)
data_to_send = self.h2_state.data_to_send()
- await self.stream.write(data_to_send, timeout)
+ await self.socket.write(data_to_send, timeout)
return self.events[stream_id].pop(0)
return False
def is_connection_dropped(self) -> bool:
- return self.stream.is_connection_dropped()
+ return self.socket.is_connection_dropped()
assert http_connection is not None
assert http_connection.h11_state.our_state == h11.SWITCHED_PROTOCOL
on_release = http_connection.on_release
- stream = http_connection.stream
+ socket = http_connection.socket
# If we need to start TLS again for the target server
# we need to pull the socket stream off the internal
f"proxy_url={self.proxy_url!r} "
f"origin={origin!r}"
)
- stream = await stream.start_tls(
+ socket = await socket.start_tls(
hostname=origin.host, ssl_context=ssl_context, timeout=timeout
)
- http_version = stream.get_http_version()
+ http_version = socket.get_http_version()
logger.trace(
f"tunnel_tls_complete "
f"proxy_url={self.proxy_url!r} "
if http_version == "HTTP/2":
connection.h2_connection = HTTP2Connection(
- stream, self.backend, on_release=on_release
+ socket, self.backend, on_release=on_release
)
else:
assert http_version == "HTTP/1.1"
- connection.h11_connection = HTTP11Connection(stream, on_release=on_release)
+ connection.h11_connection = HTTP11Connection(socket, on_release=on_release)
def should_forward_origin(self, origin: Origin) -> bool:
"""Determines if the given origin should