BaseWriter,
ConcurrencyBackend,
Dispatcher,
- Protocol,
)
from .models import (
URL,
"BaseWriter",
"ConcurrencyBackend",
"Dispatcher",
- "Protocol",
"URL",
"URLTypes",
"StatusCode",
BaseReader,
BaseWriter,
ConcurrencyBackend,
- Protocol,
)
SSL_MONKEY_PATCH_APPLIED = False
port: int,
ssl_context: typing.Optional[ssl.SSLContext],
timeout: TimeoutConfig,
- ) -> typing.Tuple[BaseReader, BaseWriter, Protocol]:
+ ) -> typing.Tuple[BaseReader, BaseWriter, str]:
try:
stream_reader, stream_writer = await asyncio.wait_for( # type: ignore
asyncio.open_connection(hostname, port, ssl=ssl_context),
reader = Reader(stream_reader=stream_reader, timeout=timeout)
writer = Writer(stream_writer=stream_writer, timeout=timeout)
- protocol = Protocol.HTTP_2 if ident == "h2" else Protocol.HTTP_11
+ http_version = "HTTP/2" if ident == "h2" else "HTTP/1.1"
- return reader, writer, protocol
+ return reader, writer, http_version
async def run_in_threadpool(
self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
TimeoutTypes,
VerifyTypes,
)
-from ..interfaces import AsyncDispatcher, ConcurrencyBackend, Protocol
+from ..interfaces import AsyncDispatcher, ConcurrencyBackend
from ..models import AsyncRequest, AsyncResponse, Origin
from .http2 import HTTP2Connection
from .http11 import HTTP11Connection
else:
on_release = functools.partial(self.release_func, self)
- reader, writer, protocol = await self.backend.connect(
+ reader, writer, http_version = await self.backend.connect(
host, port, ssl_context, timeout
)
- if protocol == Protocol.HTTP_2:
+ if http_version == "HTTP/2":
self.h2_connection = HTTP2Connection(
reader, writer, self.backend, on_release=on_release
)
else:
+ assert http_version == "HTTP/1.1"
self.h11_connection = HTTP11Connection(
reader, writer, self.backend, on_release=on_release
)
)
-class Protocol(str, enum.Enum):
- HTTP_11 = "HTTP/1.1"
- HTTP_2 = "HTTP/2"
-
-
class AsyncDispatcher:
"""
Base class for async dispatcher classes, that handle sending the request.
port: int,
ssl_context: typing.Optional[ssl.SSLContext],
timeout: TimeoutConfig,
- ) -> typing.Tuple[BaseReader, BaseWriter, Protocol]:
+ ) -> typing.Tuple[BaseReader, BaseWriter, str]:
raise NotImplementedError() # pragma: no cover
def get_semaphore(self, limits: PoolLimits) -> BasePoolSemaphore:
import h2.connection
import h2.events
-from httpx import (
- AsyncioBackend,
- BaseReader,
- BaseWriter,
- Protocol,
- Request,
- TimeoutConfig,
-)
+from httpx import AsyncioBackend, BaseReader, BaseWriter, Request, TimeoutConfig
class MockHTTP2Backend(AsyncioBackend):
port: int,
ssl_context: typing.Optional[ssl.SSLContext],
timeout: TimeoutConfig,
- ) -> typing.Tuple[BaseReader, BaseWriter, Protocol]:
+ ) -> typing.Tuple[BaseReader, BaseWriter, str]:
self.server = MockHTTP2Server(self.app)
- return self.server, self.server, Protocol.HTTP_2
+ return self.server, self.server, "HTTP/2"
class MockHTTP2Server(BaseReader, BaseWriter):