from .concurrency.base import (
BaseBackgroundManager,
BasePoolSemaphore,
- BaseStream,
+ BaseTCPStream,
ConcurrencyBackend,
)
from .config import (
"TooManyRedirects",
"WriteTimeout",
"AsyncDispatcher",
- "BaseStream",
+ "BaseTCPStream",
"ConcurrencyBackend",
"Dispatcher",
"URL",
-"""
-The `Stream` class here provides a lightweight layer over
-`asyncio.StreamReader` and `asyncio.StreamWriter`.
-
-Similarly `PoolSemaphore` is a lightweight layer over `BoundedSemaphore`.
-
-These classes help encapsulate the timeout logic, make it easier to unit-test
-protocols, and help keep the rest of the package more `async`/`await`
-based, and less strictly `asyncio`-specific.
-"""
import asyncio
import functools
import ssl
BaseEvent,
BasePoolSemaphore,
BaseQueue,
- BaseStream,
+ BaseTCPStream,
ConcurrencyBackend,
TimeoutFlag,
)
MonkeyPatch.write = _fixed_write
-class Stream(BaseStream):
+class TCPStream(BaseTCPStream):
def __init__(
self,
stream_reader: asyncio.StreamReader,
self._loop = asyncio.new_event_loop()
return self._loop
- async def connect(
+ async def open_tcp_stream(
self,
hostname: str,
port: int,
ssl_context: typing.Optional[ssl.SSLContext],
timeout: TimeoutConfig,
- ) -> BaseStream:
+ ) -> BaseTCPStream:
try:
stream_reader, stream_writer = await asyncio.wait_for( # type: ignore
asyncio.open_connection(hostname, port, ssl=ssl_context),
except asyncio.TimeoutError:
raise ConnectTimeout()
- return Stream(
+ return TCPStream(
stream_reader=stream_reader, stream_writer=stream_writer, timeout=timeout
)
async def start_tls(
self,
- stream: BaseStream,
+ stream: BaseTCPStream,
hostname: str,
ssl_context: ssl.SSLContext,
timeout: TimeoutConfig,
- ) -> BaseStream:
+ ) -> BaseTCPStream:
loop = self.loop
if not hasattr(loop, "start_tls"): # pragma: no cover
"asyncio.AbstractEventLoop.start_tls() is only available in Python 3.7+"
)
- assert isinstance(stream, Stream)
+ assert isinstance(stream, TCPStream)
stream_reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(stream_reader)
self.raise_on_write_timeout = True
-class BaseStream:
+class BaseTCPStream:
"""
- A stream with read/write operations. Abstracts away any asyncio-specific
+ A TCP stream with read/write operations. Abstracts away any asyncio-specific
interfaces into a more generic base class, that we can use with alternate
backends, or for stand-alone test cases.
"""
class ConcurrencyBackend:
- async def connect(
+ async def open_tcp_stream(
self,
hostname: str,
port: int,
ssl_context: typing.Optional[ssl.SSLContext],
timeout: TimeoutConfig,
- ) -> BaseStream:
+ ) -> BaseTCPStream:
raise NotImplementedError() # pragma: no cover
async def start_tls(
self,
- stream: BaseStream,
+ stream: BaseTCPStream,
hostname: str,
ssl_context: ssl.SSLContext,
timeout: TimeoutConfig,
- ) -> BaseStream:
+ ) -> BaseTCPStream:
raise NotImplementedError() # pragma: no cover
def get_semaphore(self, limits: PoolLimits) -> BasePoolSemaphore:
on_release = functools.partial(self.release_func, self)
logger.debug(f"start_connect host={host!r} port={port!r} timeout={timeout!r}")
- stream = await self.backend.connect(host, port, ssl_context, timeout)
+ stream = await self.backend.open_tcp_stream(host, port, ssl_context, timeout)
http_version = stream.get_http_version()
logger.debug(f"connected http_version={http_version!r}")
import h11
-from ..concurrency.base import BaseStream, ConcurrencyBackend, TimeoutFlag
+from ..concurrency.base import BaseTCPStream, ConcurrencyBackend, TimeoutFlag
from ..config import TimeoutConfig, TimeoutTypes
from ..models import AsyncRequest, AsyncResponse
from ..utils import get_logger
def __init__(
self,
- stream: BaseStream,
+ stream: BaseTCPStream,
backend: ConcurrencyBackend,
on_release: typing.Optional[OnReleaseCallback] = None,
):
import h2.connection
import h2.events
-from ..concurrency.base import BaseEvent, BaseStream, ConcurrencyBackend, TimeoutFlag
+from ..concurrency.base import BaseEvent, BaseTCPStream, ConcurrencyBackend, TimeoutFlag
from ..config import TimeoutConfig, TimeoutTypes
from ..models import AsyncRequest, AsyncResponse
from ..utils import get_logger
def __init__(
self,
- stream: BaseStream,
+ stream: BaseTCPStream,
backend: ConcurrencyBackend,
on_release: typing.Callable = None,
):
import h2.connection
import h2.events
-from httpx import AsyncioBackend, BaseStream, Request, TimeoutConfig
+from httpx import AsyncioBackend, BaseTCPStream, Request, TimeoutConfig
from tests.concurrency import sleep
self.backend = AsyncioBackend() if backend is None else backend
self.server = None
- async def connect(
+ async def open_tcp_stream(
self,
hostname: str,
port: int,
ssl_context: typing.Optional[ssl.SSLContext],
timeout: TimeoutConfig,
- ) -> BaseStream:
+ ) -> BaseTCPStream:
self.server = MockHTTP2Server(self.app, backend=self.backend)
return self.server
return getattr(self.backend, name)
-class MockHTTP2Server(BaseStream):
+class MockHTTP2Server(BaseTCPStream):
def __init__(self, app, backend):
config = h2.config.H2Configuration(client_side=False)
self.conn = h2.connection.H2Connection(config=config)
self.return_data = {}
self.returning = {}
- # Stream interface
+ # TCP stream interface
def get_http_version(self) -> str:
return "HTTP/2"
ctx = SSLConfig().load_ssl_context_no_verify(HTTPVersionConfig())
timeout = TimeoutConfig(5)
- stream = await backend.connect(
+ stream = await backend.open_tcp_stream(
https_server.url.host, https_server.url.port, None, timeout
)