import typing
from typing import (
- Tuple,
Iterable,
- List,
Mapping,
Iterator,
- Dict,
- Union,
- Optional,
Awaitable,
Generator,
AnyStr,
"""
@typing.overload
- def __init__(self, __arg: Mapping[str, List[str]]) -> None:
+ def __init__(self, __arg: Mapping[str, list[str]]) -> None:
pass
@typing.overload # noqa: F811
pass
@typing.overload # noqa: F811
- def __init__(self, *args: Tuple[str, str]) -> None:
+ def __init__(self, *args: tuple[str, str]) -> None:
pass
@typing.overload # noqa: F811
# on demand (and cleared whenever the list is modified).
self._as_list: dict[str, list[str]] = {}
self._combined_cache: dict[str, str] = {}
- self._last_key: Optional[str] = None
+ self._last_key: str | None = None
if len(args) == 1 and len(kwargs) == 0 and isinstance(args[0], HTTPHeaders):
# Copy constructor
for k, v in args[0].get_all():
else:
self[norm_name] = value
- def get_list(self, name: str) -> List[str]:
+ def get_list(self, name: str) -> list[str]:
"""Returns all values for the given header as a list."""
norm_name = _normalize_header(name)
return self._as_list.get(norm_name, [])
- def get_all(self) -> Iterable[Tuple[str, str]]:
+ def get_all(self) -> Iterable[tuple[str, str]]:
"""Returns an iterable of all (name, value) pairs.
If a header has multiple values, multiple pairs will be
)
@classmethod
- def parse(cls, headers: str, *, _chars_are_bytes: bool = True) -> "HTTPHeaders":
+ def parse(cls, headers: str, *, _chars_are_bytes: bool = True) -> HTTPHeaders:
"""Returns a dictionary from HTTP header text.
>>> h = HTTPHeaders.parse("Content-Type: text/html\\r\\nContent-Length: 42\\r\\n")
def __iter__(self) -> Iterator[typing.Any]:
return iter(self._as_list)
- def copy(self) -> "HTTPHeaders":
+ def copy(self) -> HTTPHeaders:
# defined in dict but not in MutableMapping.
return HTTPHeaders(self)
query: str
# HACK: Used for stream_request_body
- _body_future: "Future[None]"
+ _body_future: Future[None]
def __init__(
self,
- method: Optional[str] = None,
- uri: Optional[str] = None,
+ method: str | None = None,
+ uri: str | None = None,
version: str = "HTTP/1.0",
- headers: Optional[HTTPHeaders] = None,
- body: Optional[bytes] = None,
- host: Optional[str] = None,
- files: Optional[Dict[str, List["HTTPFile"]]] = None,
- connection: Optional["HTTPConnection"] = None,
- start_line: Optional["RequestStartLine"] = None,
- server_connection: Optional[object] = None,
+ headers: HTTPHeaders | None = None,
+ body: bytes | None = None,
+ host: str | None = None,
+ files: dict[str, list[HTTPFile]] | None = None,
+ connection: HTTPConnection | None = None,
+ start_line: RequestStartLine | None = None,
+ server_connection: object | None = None,
) -> None:
if start_line is not None:
method, uri, version = start_line
self.path, sep, self.query = uri.partition("?")
self.arguments = parse_qs_bytes(self.query, keep_blank_values=True)
self.query_arguments = copy.deepcopy(self.arguments)
- self.body_arguments: Dict[str, List[bytes]] = {}
+ self.body_arguments: dict[str, list[bytes]] = {}
@property
- def cookies(self) -> Dict[str, http.cookies.Morsel]:
+ def cookies(self) -> dict[str, http.cookies.Morsel]:
"""A dictionary of ``http.cookies.Morsel`` objects."""
if not hasattr(self, "_cookies"):
self._cookies: http.cookies.SimpleCookie = http.cookies.SimpleCookie()
else:
return self._finish_time - self._start_time
- def get_ssl_certificate(
- self, binary_form: bool = False
- ) -> Union[None, Dict, bytes]:
+ def get_ssl_certificate(self, binary_form: bool = False) -> None | dict | bytes:
"""Returns the client's SSL certificate, if any.
To use client certificates, the HTTPServer's
"""
def start_request(
- self, server_conn: object, request_conn: "HTTPConnection"
- ) -> "HTTPMessageDelegate":
+ self, server_conn: object, request_conn: HTTPConnection
+ ) -> HTTPMessageDelegate:
"""This method is called by the server when a new request has started.
:arg server_conn: is an opaque object representing the long-lived
# TODO: genericize this class to avoid exposing the Union.
def headers_received(
self,
- start_line: Union["RequestStartLine", "ResponseStartLine"],
+ start_line: RequestStartLine | ResponseStartLine,
headers: HTTPHeaders,
- ) -> Optional[Awaitable[None]]:
+ ) -> Awaitable[None] | None:
"""Called when the HTTP headers have been received and parsed.
:arg start_line: a `.RequestStartLine` or `.ResponseStartLine`
"""
pass
- def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
+ def data_received(self, chunk: bytes) -> Awaitable[None] | None:
"""Called when a chunk of data has been received.
May return a `.Future` for flow control.
def write_headers(
self,
- start_line: Union["RequestStartLine", "ResponseStartLine"],
+ start_line: RequestStartLine | ResponseStartLine,
headers: HTTPHeaders,
- chunk: Optional[bytes] = None,
- ) -> "Future[None]":
+ chunk: bytes | None = None,
+ ) -> Future[None]:
"""Write an HTTP header block.
:arg start_line: a `.RequestStartLine` or `.ResponseStartLine`.
"""
raise NotImplementedError()
- def write(self, chunk: bytes) -> "Future[None]":
+ def write(self, chunk: bytes) -> Future[None]:
"""Writes a chunk of body data.
Returns a future for flow control.
def url_concat(
url: str,
- args: Union[
- None, Dict[str, str], List[Tuple[str, str]], Tuple[Tuple[str, str], ...]
- ],
+ args: None | dict[str, str] | list[tuple[str, str]] | tuple[tuple[str, str], ...],
) -> str:
"""Concatenate url and arguments regardless of whether
url has existing query parameters.
def _parse_request_range(
range_header: str,
-) -> Optional[Tuple[Optional[int], Optional[int]]]:
+) -> tuple[int | None, int | None] | None:
"""Parses a Range header.
Returns either ``None`` or tuple ``(start, end)``.
return (start, end)
-def _get_content_range(start: Optional[int], end: Optional[int], total: int) -> str:
+def _get_content_range(start: int | None, end: int | None, total: int) -> str:
"""Returns a suitable Content-Range header:
>>> print(_get_content_range(None, 1, 4))
return f"bytes {start}-{end}/{total}"
-def _int_or_none(val: str) -> Optional[int]:
+def _int_or_none(val: str) -> int | None:
val = val.strip()
if val == "":
return None
def parse_body_arguments(
content_type: str,
body: bytes,
- arguments: Dict[str, List[bytes]],
- files: Dict[str, List[HTTPFile]],
- headers: Optional[HTTPHeaders] = None,
+ arguments: dict[str, list[bytes]],
+ files: dict[str, list[HTTPFile]],
+ headers: HTTPHeaders | None = None,
*,
- config: Optional[ParseBodyConfig] = None,
+ config: ParseBodyConfig | None = None,
) -> None:
"""Parses a form request body.
def parse_multipart_form_data(
boundary: bytes,
data: bytes,
- arguments: Dict[str, List[bytes]],
- files: Dict[str, List[HTTPFile]],
+ arguments: dict[str, list[bytes]],
+ files: dict[str, list[HTTPFile]],
*,
- config: Optional[ParseMultipartConfig] = None,
+ config: ParseMultipartConfig | None = None,
) -> None:
"""Parses a ``multipart/form-data`` body.
def format_timestamp(
- ts: Union[int, float, tuple, time.struct_time, datetime.datetime],
+ ts: int | float | tuple | time.struct_time | datetime.datetime,
) -> str:
"""Formats a timestamp in the format used by HTTP.
# RFCs for multipart/form-data) before making this change.
-def _parseparam(s: str) -> Generator[str, None, None]:
+def _parseparam(s: str) -> Generator[str]:
start = 0
while s.find(";", start) == start:
start += 1
start = end
-def _parse_header(line: str) -> Tuple[str, Dict[str, str]]:
+def _parse_header(line: str) -> tuple[str, dict[str, str]]:
r"""Parse a Content-type like header.
Return the main content-type and a dictionary of options.
return key, pdict
-def _encode_header(key: str, pdict: Dict[str, str]) -> str:
+def _encode_header(key: str, pdict: dict[str, str]) -> str:
"""Inverse of _parse_header.
>>> _encode_header('permessage-deflate',
return "; ".join(out)
-def encode_username_password(
- username: Union[str, bytes], password: Union[str, bytes]
-) -> bytes:
+def encode_username_password(username: str | bytes, password: str | bytes) -> bytes:
"""Encodes a username/password pair in the format used by HTTP auth.
The return value is a byte string in the form ``username:password``.
_netloc_re = re.compile(r"^(.+):(\d+)$")
-def split_host_and_port(netloc: str) -> Tuple[str, Optional[int]]:
+def split_host_and_port(netloc: str) -> tuple[str, int | None]:
"""Returns ``(host, port)`` tuple from ``netloc``.
Returned ``port`` will be ``None`` if not present.
match = _netloc_re.match(netloc)
if match:
host = match.group(1)
- port: Optional[int] = int(match.group(2))
+ port: int | None = int(match.group(2))
else:
host = netloc
port = None
return (host, port)
-def qs_to_qsl(qs: Dict[str, List[AnyStr]]) -> Iterable[Tuple[str, AnyStr]]:
+def qs_to_qsl(qs: dict[str, list[AnyStr]]) -> Iterable[tuple[str, AnyStr]]:
"""Generator converting a result of ``parse_qs`` back to name-value pairs.
.. versionadded:: 5.0
return _unquote_sub(_unquote_replace, s)
-def parse_cookie(cookie: str) -> Dict[str, str]:
+def parse_cookie(cookie: str) -> dict[str, str]:
"""Parse a ``Cookie`` HTTP header into a dict of name/value pairs.
This function attempts to mimic browser cookie parsing behavior;
from tornado.util import Configurable, TimeoutError, import_object
import typing
-from typing import Union, Any, Type, Optional, Callable, TypeVar, Tuple, Awaitable
+from typing import Union, Any, Type, Optional, Callable, TypeVar, Awaitable
if typing.TYPE_CHECKING:
from typing import Dict, List, Set, TypedDict # noqa: F401
ERROR = 0x018
# In Python 3, _ioloop_for_asyncio maps from asyncio loops to IOLoops.
- _ioloop_for_asyncio: Dict[asyncio.AbstractEventLoop, IOLoop] = dict()
+ _ioloop_for_asyncio: dict[asyncio.AbstractEventLoop, IOLoop] = dict()
# Maintain a set of all pending tasks to follow the warning in the docs
# of asyncio.create_tasks:
# https://github.com/python/cpython/issues/91887
# If that change is accepted, this can eventually be removed.
# If it is not, we will consider the rationale and may remove this.
- _pending_tasks: Set[Future] = set()
+ _pending_tasks: set[Future] = set()
@classmethod
def configure(
- cls, impl: "Union[None, str, Type[Configurable]]", **kwargs: Any
+ cls, impl: Union[None, str, Type[Configurable]], **kwargs: Any
) -> None:
from tornado.platform.asyncio import BaseAsyncIOLoop
super().configure(impl, **kwargs)
@staticmethod
- def instance() -> "IOLoop":
+ def instance() -> IOLoop:
"""Deprecated alias for `IOLoop.current()`.
.. versionchanged:: 5.0
@typing.overload
@staticmethod
- def current() -> "IOLoop":
+ def current() -> IOLoop:
pass
@typing.overload
@staticmethod
- def current(instance: bool = True) -> Optional["IOLoop"]: # noqa: F811
+ def current(instance: bool = True) -> IOLoop | None: # noqa: F811
pass
@staticmethod
- def current(instance: bool = True) -> Optional["IOLoop"]: # noqa: F811
+ def current(instance: bool = True) -> IOLoop | None: # noqa: F811
"""Returns the current thread's `IOLoop`.
If an `IOLoop` is currently running or has been marked as
if instance:
from tornado.platform.asyncio import AsyncIOMainLoop
- current: Optional[IOLoop] = AsyncIOMainLoop()
+ current: IOLoop | None = AsyncIOMainLoop()
else:
current = None
return current
pass
@classmethod
- def configurable_base(cls) -> Type[Configurable]:
+ def configurable_base(cls) -> type[Configurable]:
return IOLoop
@classmethod
- def configurable_default(cls) -> Type[Configurable]:
+ def configurable_default(cls) -> type[Configurable]:
from tornado.platform.asyncio import AsyncIOLoop
return AsyncIOLoop
pass
def add_handler( # noqa: F811
- self, fd: Union[int, _Selectable], handler: Callable[..., None], events: int
+ self, fd: int | _Selectable, handler: Callable[..., None], events: int
) -> None:
"""Registers the given handler to receive the given events for ``fd``.
"""
raise NotImplementedError()
- def update_handler(self, fd: Union[int, _Selectable], events: int) -> None:
+ def update_handler(self, fd: int | _Selectable, events: int) -> None:
"""Changes the events we listen for ``fd``.
.. versionchanged:: 4.0
"""
raise NotImplementedError()
- def remove_handler(self, fd: Union[int, _Selectable]) -> None:
+ def remove_handler(self, fd: int | _Selectable) -> None:
"""Stop listening for events on ``fd``.
.. versionchanged:: 4.0
"""
raise NotImplementedError()
- def run_sync(self, func: Callable, timeout: Optional[float] = None) -> Any:
+ def run_sync(self, func: Callable, timeout: float | None = None) -> Any:
"""Starts the `IOLoop`, runs the given function, and stops the loop.
The function must return either an awaitable object or
``tornado.util.TimeoutError`` is now an alias to ``asyncio.TimeoutError``.
"""
if typing.TYPE_CHECKING:
- FutureCell = TypedDict( # noqa: F841
- "FutureCell", {"future": Optional[Future], "timeout_called": bool}
- )
+
+ class FutureCell(TypedDict):
+ # noqa: F841
+ future: Optional[Future]
+ timeout_called: bool
+
future_cell: FutureCell = {"future": None, "timeout_called": False}
def run() -> None:
def add_timeout(
self,
- deadline: Union[float, datetime.timedelta],
+ deadline: float | datetime.timedelta,
callback: Callable,
*args: Any,
**kwargs: Any,
def add_future(
self,
- future: "Union[Future[_T], concurrent.futures.Future[_T]]",
- callback: Callable[["Future[_T]"], None],
+ future: Union[Future[_T], concurrent.futures.Future[_T]],
+ callback: Callable[[Future[_T]], None],
) -> None:
"""Schedules a callback on the ``IOLoop`` when the given
`.Future` is finished.
def run_in_executor(
self,
- executor: Optional[concurrent.futures.Executor],
+ executor: concurrent.futures.Executor | None,
func: Callable[..., _T],
*args: Any,
- ) -> "Future[_T]":
+ ) -> Future[_T]:
"""Runs a function in a ``concurrent.futures.Executor``. If
``executor`` is ``None``, the IO loop's default executor will be used.
"""Avoid unhandled-exception warnings from spawned coroutines."""
future.result()
- def split_fd(
- self, fd: Union[int, _Selectable]
- ) -> Tuple[int, Union[int, _Selectable]]:
+ def split_fd(self, fd: int | _Selectable) -> tuple[int, int | _Selectable]:
# """Returns an (fd, obj) pair from an ``fd`` parameter.
# We accept both raw file descriptors and file-like objects as
return fd, fd
return fd.fileno(), fd
- def close_fd(self, fd: Union[int, _Selectable]) -> None:
+ def close_fd(self, fd: int | _Selectable) -> None:
# """Utility method to close an ``fd``.
# If ``fd`` is a file-like object, we close it directly; otherwise
raise TypeError("Unsupported deadline %r" % deadline)
self.deadline = deadline
self.callback = callback
- self.tdeadline: Tuple[float, int] = (
+ self.tdeadline: tuple[float, int] = (
deadline,
next(io_loop._timeout_counter),
)
# to guarantee a consistent ordering. The heapq module uses __le__
# in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons
# use __lt__).
- def __lt__(self, other: "_Timeout") -> bool:
+ def __lt__(self, other: _Timeout) -> bool:
return self.tdeadline < other.tdeadline
- def __le__(self, other: "_Timeout") -> bool:
+ def __le__(self, other: _Timeout) -> bool:
return self.tdeadline <= other.tdeadline
def __init__(
self,
- callback: Callable[[], Optional[Awaitable]],
- callback_time: Union[datetime.timedelta, float],
+ callback: Callable[[], Awaitable | None],
+ callback_time: datetime.timedelta | float,
jitter: float = 0,
) -> None:
self.callback = callback
from tornado._locale_data import LOCALE_NAMES
-from typing import Iterable, Any, Union, Dict, Optional
+from typing import Iterable, Any
_default_locale = "en_US"
-_translations: Dict[str, Any] = {}
+_translations: dict[str, Any] = {}
_supported_locales = frozenset([_default_locale])
_use_gettext = False
CONTEXT_SEPARATOR = "\x04"
-def get(*locale_codes: str) -> "Locale":
+def get(*locale_codes: str) -> Locale:
"""Returns the closest match for the given locale codes.
We iterate over all given locale codes in order. If we have a tight
_supported_locales = frozenset(list(_translations.keys()) + [_default_locale])
-def load_translations(directory: str, encoding: Optional[str] = None) -> None:
+def load_translations(directory: str, encoding: str | None = None) -> None:
"""Loads translations from CSV files in a directory.
Translations are strings with optional Python-style named placeholders
call `get` or `get_closest` to get a Locale object.
"""
- _cache: Dict[str, Locale] = {}
+ _cache: dict[str, Locale] = {}
@classmethod
- def get_closest(cls, *locale_codes: str) -> "Locale":
+ def get_closest(cls, *locale_codes: str) -> Locale:
"""Returns the closest match for the given locale code."""
for code in locale_codes:
if not code:
return cls.get(_default_locale)
@classmethod
- def get(cls, code: str) -> "Locale":
+ def get(cls, code: str) -> Locale:
"""Returns the Locale for the given locale code.
If it is not supported, we raise an exception.
def translate(
self,
message: str,
- plural_message: Optional[str] = None,
- count: Optional[int] = None,
+ plural_message: str | None = None,
+ count: int | None = None,
) -> str:
"""Returns the translation for the given message for this locale.
self,
context: str,
message: str,
- plural_message: Optional[str] = None,
- count: Optional[int] = None,
+ plural_message: str | None = None,
+ count: int | None = None,
) -> str:
raise NotImplementedError()
def format_date(
self,
- date: Union[int, float, datetime.datetime],
+ date: int | float | datetime.datetime,
gmt_offset: int = 0,
relative: bool = True,
shorter: bool = False,
class CSVLocale(Locale):
"""Locale implementation using tornado's CSV translation format."""
- def __init__(self, code: str, translations: Dict[str, Dict[str, str]]) -> None:
+ def __init__(self, code: str, translations: dict[str, dict[str, str]]) -> None:
self.translations = translations
super().__init__(code)
def translate(
self,
message: str,
- plural_message: Optional[str] = None,
- count: Optional[int] = None,
+ plural_message: str | None = None,
+ count: int | None = None,
) -> str:
if plural_message is not None:
assert count is not None
self,
context: str,
message: str,
- plural_message: Optional[str] = None,
- count: Optional[int] = None,
+ plural_message: str | None = None,
+ count: int | None = None,
) -> str:
if self.translations:
gen_log.warning("pgettext is not supported by CSVLocale")
def translate(
self,
message: str,
- plural_message: Optional[str] = None,
- count: Optional[int] = None,
+ plural_message: str | None = None,
+ count: int | None = None,
) -> str:
if plural_message is not None:
assert count is not None
self,
context: str,
message: str,
- plural_message: Optional[str] = None,
- count: Optional[int] = None,
+ plural_message: str | None = None,
+ count: int | None = None,
) -> str:
"""Allows to set context for translation, accepts plural forms.
from tornado.concurrent import Future, future_set_result_unless_cancelled
from tornado.locks import Event
-from typing import Union, TypeVar, Generic, Awaitable, Optional
+from typing import TypeVar, Generic, Awaitable
import typing
if typing.TYPE_CHECKING:
pass
-def _set_timeout(
- future: Future, timeout: Union[None, float, datetime.timedelta]
-) -> None:
+def _set_timeout(future: Future, timeout: None | float | datetime.timedelta) -> None:
if timeout:
def on_timeout() -> None:
class _QueueIterator(Generic[_T]):
- def __init__(self, q: "Queue[_T]") -> None:
+ def __init__(self, q: Queue[_T]) -> None:
self.q = q
def __anext__(self) -> Awaitable[_T]:
self._maxsize = maxsize
self._init()
self._getters: Deque[Future[_T]] = collections.deque([])
- self._putters: Deque[Tuple[_T, Future[None]]] = collections.deque([])
+ self._putters: Deque[tuple[_T, Future[None]]] = collections.deque([])
self._unfinished_tasks = 0
self._finished = Event()
self._finished.set()
return self.qsize() >= self.maxsize
def put(
- self, item: _T, timeout: Optional[Union[float, datetime.timedelta]] = None
- ) -> "Future[None]":
+ self, item: _T, timeout: float | datetime.timedelta | None = None
+ ) -> Future[None]:
"""Put an item into the queue, perhaps waiting until there is room.
Returns a Future, which raises `tornado.util.TimeoutError` after a
else:
self.__put_internal(item)
- def get(
- self, timeout: Optional[Union[float, datetime.timedelta]] = None
- ) -> Awaitable[_T]:
+ def get(self, timeout: float | datetime.timedelta | None = None) -> Awaitable[_T]:
"""Remove and return an item from the queue.
Returns an awaitable which resolves once an item is available, or raises
self._finished.set()
def join(
- self, timeout: Optional[Union[float, datetime.timedelta]] = None
+ self, timeout: float | datetime.timedelta | None = None
) -> Awaitable[None]:
"""Block until all items in the queue are processed.
from typing import (
Any,
- Optional,
Dict,
Mapping,
- List,
- Tuple,
Match,
Callable,
Type,
def exec_in(
- code: Any, glob: Dict[str, Any], loc: Optional[Optional[Mapping[str, Any]]] = None
+ code: Any, glob: dict[str, Any], loc: Mapping[str, Any] | None | None = None
) -> None:
if isinstance(code, str):
# exec(string) inherits the caller's future imports; compile
def raise_exc_info(
- exc_info: Tuple[Optional[type], Optional[BaseException], Optional["TracebackType"]],
+ exc_info: tuple[type | None, BaseException | None, TracebackType | None],
) -> typing.NoReturn:
try:
if exc_info[1] is not None:
exc_info = (None, None, None)
-def errno_from_exception(e: BaseException) -> Optional[int]:
+def errno_from_exception(e: BaseException) -> int | None:
"""Provides the errno from an Exception object.
There are cases that the errno attribute was not set so we pull
# There may be a clever way to use generics here to get more
# precise types (i.e. for a particular Configurable subclass T,
# all the types are subclasses of T, not just Configurable).
- __impl_class: Optional[Type["Configurable"]] = None
- __impl_kwargs: Optional[Dict[str, Any]] = None
+ __impl_class: type[Configurable] | None = None
+ __impl_kwargs: dict[str, Any] | None = None
def __new__(cls, *args: Any, **kwargs: Any) -> Any:
base = cls.configurable_base()
- init_kwargs: Dict[str, Any] = {}
+ init_kwargs: dict[str, Any] = {}
if cls is base:
impl = cls.configured_class()
if base.__impl_kwargs:
return instance
@classmethod
- def configurable_base(cls) -> Type[Configurable]:
+ def configurable_base(cls) -> type[Configurable]:
"""Returns the base class of a configurable hierarchy.
This will normally return the class in which it is defined.
raise NotImplementedError()
@classmethod
- def configurable_default(cls) -> Type[Configurable]:
+ def configurable_default(cls) -> type[Configurable]:
"""Returns the implementation class to be used if none is configured."""
raise NotImplementedError()
"""
@classmethod
- def configure(
- cls, impl: Union[None, str, Type[Configurable]], **kwargs: Any
- ) -> None:
+ def configure(cls, impl: None | str | type[Configurable], **kwargs: Any) -> None:
"""Sets the class to use when the base class is instantiated.
Keyword arguments will be saved and added to the arguments passed
base.__impl_kwargs = kwargs
@classmethod
- def configured_class(cls) -> Type[Configurable]:
+ def configured_class(cls) -> type[Configurable]:
"""Returns the currently configured class."""
base = cls.configurable_base()
# Manually mangle the private name to see whether this base
@classmethod
def _save_configuration(
cls,
- ) -> Tuple[Optional[Type[Configurable]], Optional[Dict[str, Any]]]:
+ ) -> tuple[type[Configurable] | None, dict[str, Any] | None]:
base = cls.configurable_base()
return (base.__impl_class, base.__impl_kwargs)
@classmethod
def _restore_configuration(
- cls, saved: Tuple[Optional[Type[Configurable]], Optional[Dict[str, Any]]]
+ cls, saved: tuple[type[Configurable] | None, dict[str, Any] | None]
) -> None:
base = cls.configurable_base()
base.__impl_class = saved[0]
def __init__(self, func: Callable, name: str) -> None:
self.name = name
try:
- self.arg_pos: Optional[int] = self._getargnames(func).index(name)
+ self.arg_pos: int | None = self._getargnames(func).index(name)
except ValueError:
# Not a positional parameter
self.arg_pos = None
- def _getargnames(self, func: Callable) -> List[str]:
+ def _getargnames(self, func: Callable) -> list[str]:
try:
return getfullargspec(func).args
except TypeError:
raise
def get_old_value(
- self, args: Sequence[Any], kwargs: Dict[str, Any], default: Any = None
+ self, args: Sequence[Any], kwargs: dict[str, Any], default: Any = None
) -> Any:
"""Returns the old value of the named argument without replacing it.
return kwargs.get(self.name, default)
def replace(
- self, new_value: Any, args: Sequence[Any], kwargs: Dict[str, Any]
- ) -> Tuple[Any, Sequence[Any], Dict[str, Any]]:
+ self, new_value: Any, args: Sequence[Any], kwargs: dict[str, Any]
+ ) -> tuple[Any, Sequence[Any], dict[str, Any]]:
"""Replace the named argument in ``args, kwargs`` with ``new_value``.
Returns ``(old_value, args, kwargs)``. The returned ``args`` and