await self._stream.aclose()
+EventHook = typing.Callable[..., typing.Any]
+
+
class BaseClient:
def __init__(
self,
follow_redirects: bool = False,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
event_hooks: typing.Optional[
- typing.Mapping[str, typing.List[typing.Callable]]
+ typing.Mapping[str, typing.List[EventHook]]
] = None,
base_url: URLTypes = "",
trust_env: bool = True,
self._timeout = Timeout(timeout)
@property
- def event_hooks(self) -> typing.Dict[str, typing.List[typing.Callable]]:
+ def event_hooks(self) -> typing.Dict[str, typing.List[EventHook]]:
return self._event_hooks
@event_hooks.setter
def event_hooks(
- self, event_hooks: typing.Dict[str, typing.List[typing.Callable]]
+ self, event_hooks: typing.Dict[str, typing.List[EventHook]]
) -> None:
self._event_hooks = {
"request": list(event_hooks.get("request", [])),
limits: Limits = DEFAULT_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
event_hooks: typing.Optional[
- typing.Mapping[str, typing.List[typing.Callable]]
+ typing.Mapping[str, typing.List[EventHook]]
] = None,
base_url: URLTypes = "",
transport: typing.Optional[BaseTransport] = None,
- app: typing.Optional[typing.Callable] = None,
+ app: typing.Optional[typing.Callable[..., typing.Any]] = None,
trust_env: bool = True,
default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8",
):
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
transport: typing.Optional[BaseTransport] = None,
- app: typing.Optional[typing.Callable] = None,
+ app: typing.Optional[typing.Callable[..., typing.Any]] = None,
trust_env: bool = True,
) -> BaseTransport:
if transport is not None:
limits: Limits = DEFAULT_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
event_hooks: typing.Optional[
- typing.Mapping[str, typing.List[typing.Callable]]
+ typing.Mapping[str, typing.List[typing.Callable[..., typing.Any]]]
] = None,
base_url: URLTypes = "",
transport: typing.Optional[AsyncBaseTransport] = None,
- app: typing.Optional[typing.Callable] = None,
+ app: typing.Optional[typing.Callable[..., typing.Any]] = None,
trust_env: bool = True,
default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8",
):
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
transport: typing.Optional[AsyncBaseTransport] = None,
- app: typing.Optional[typing.Callable] = None,
+ app: typing.Optional[typing.Callable[..., typing.Any]] = None,
trust_env: bool = True,
) -> AsyncBaseTransport:
if transport is not None: