--- /dev/null
+.. change::
+ :tags: bug, pep484
+ :tickets: 6461
+
+ Remove pep484 types from the code.
+ Current effort is around the stub package, and having typing in
+ two places makes thing worse, since the types in the SQLAlchemy
+ source were usually outdated compared to the version in the stubs.
from ...types import BOOLEAN
from ...types import DATE
from ...types import VARBINARY
-from ...util import compat
from ...util import topological
-if compat.TYPE_CHECKING:
- from typing import Any
-
RESERVED_WORDS = set(
[
"accessible",
return parser.parse(sql, charset)
def _detect_charset(self, connection):
- # type: (Any) -> str
raise NotImplementedError()
def _detect_casing(self, connection):
from .. import util
from ..sql.compiler import Compiled # noqa
from ..sql.compiler import TypeCompiler # noqa
-from ..util import compat
-
-if compat.TYPE_CHECKING:
- from typing import Any
-
- from .url import URL
class Dialect(object):
""" # noqa: E501
def __init__(self, url, kwargs):
- # type: (URL, dict[str, Any]) -> None
"""Construct a new :class:`.CreateEnginePlugin`.
The plugin object is instantiated individually for each call
return self._metadata._row_as_tuple_getter(keys)
def mappings(self):
- # type() -> MappingResult
"""Apply a mappings filter to returned rows, returning an instance of
:class:`_result.MappingResult`.
from ..util import compat
-if compat.TYPE_CHECKING:
- from typing import Mapping
- from typing import Optional
- from typing import Sequence
- from typing import Tuple
- from typing import Union
-
-
class URL(
util.namedtuple(
"URL",
@classmethod
def create(
cls,
- drivername, # type: str
- username=None, # type: Optional[str]
- password=None, # type: Optional[Union[str, object]]
- host=None, # type: Optional[str]
- port=None, # type: Optional[int]
- database=None, # type: Optional[str]
- query=util.EMPTY_DICT, # type: Mapping[str, Union[str, Sequence[str]]]
+ drivername,
+ username=None,
+ password=None,
+ host=None,
+ port=None,
+ database=None,
+ query=util.EMPTY_DICT,
):
- # type: (...) -> URL
"""Create a new :class:`_engine.URL` object.
:param drivername: the name of the database backend. This name will
def set(
self,
- drivername=None, # type: Optional[str]
- username=None, # type: Optional[str]
- password=None, # type: Optional[Union[str, object]]
- host=None, # type: Optional[str]
- port=None, # type: Optional[int]
- database=None, # type: Optional[str]
- query=None, # type: Optional[Mapping[str, Union[str, Sequence[str]]]]
+ drivername=None,
+ username=None,
+ password=None,
+ host=None,
+ port=None,
+ database=None,
+ query=None,
):
- # type: (...) -> URL
"""return a new :class:`_engine.URL` object with modifications.
Values are used if they are non-None. To set a value to ``None``
return self._replace(**kw)
def _replace(self, **kw):
- # type: (**object) -> URL
"""Override ``namedtuple._replace()`` to provide argument checking."""
if "drivername" in kw:
return super(URL, self)._replace(**kw)
def update_query_string(self, query_string, append=False):
- # type: (str, bool) -> URL
"""Return a new :class:`_engine.URL` object with the :attr:`_engine.URL.query`
parameter dictionary updated by the given query string.
)
def update_query_pairs(self, key_value_pairs, append=False):
- # type: (Sequence[Tuple[str, str]], bool) -> URL
"""Return a new :class:`_engine.URL` object with the
:attr:`_engine.URL.query`
parameter dictionary updated by the given sequence of key/value pairs
return self.set(query=new_query)
def update_query_dict(self, query_parameters, append=False):
- # type: (Mapping[str, Union[str, Sequence[str]]], bool) -> URL
"""Return a new :class:`_engine.URL` object with the
:attr:`_engine.URL.query` parameter dictionary updated by the given
dictionary.
return self.update_query_pairs(query_parameters.items(), append=append)
def difference_update_query(self, names):
- # type: (Sequence[str]) -> URL
"""
Remove the given names from the :attr:`_engine.URL.query` dictionary,
returning the new :class:`_engine.URL`.
":meth:`_engine.URL.render_as_string` method.",
)
def __to_string__(self, hide_password=True):
- # type: (bool) -> str
"""Render this :class:`_engine.URL` object as a string.
:param hide_password: Defaults to True. The password is not shown
return self.render_as_string(hide_password=hide_password)
def render_as_string(self, hide_password=True):
- # type: (bool) -> str
"""Render this :class:`_engine.URL` object as a string.
This method is used when the ``__str__()`` or ``__repr__()``
class StartableContext(abc.ABC):
@abc.abstractmethod
- async def start(self, is_ctxmanager=False) -> "StartableContext":
+ async def start(self, is_ctxmanager=False):
pass
def __await__(self):
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-
-from typing import Any
-from typing import Callable
-from typing import Mapping
-from typing import Optional
-
from . import exc as async_exc
from .base import ProxyComparable
from .base import StartableContext
from ... import exc
from ... import util
from ...engine import create_engine as _create_engine
-from ...engine import Result
-from ...engine import Transaction
from ...future import Connection
from ...future import Engine
-from ...sql import Executable
from ...util.concurrency import greenlet_spawn
"sync_connection",
)
- def __init__(
- self,
- async_engine: "AsyncEngine",
- sync_connection: Optional[Connection] = None,
- ):
+ def __init__(self, async_engine, sync_connection=None):
self.engine = async_engine
self.sync_engine = async_engine.sync_engine
self.sync_connection = sync_connection
self._raise_for_not_started()
return self.sync_connection
- def begin(self) -> "AsyncTransaction":
+ def begin(self):
"""Begin a transaction prior to autobegin occurring."""
self._sync_connection()
return AsyncTransaction(self)
- def begin_nested(self) -> "AsyncTransaction":
+ def begin_nested(self):
"""Begin a nested transaction and return a transaction handle."""
self._sync_connection()
return AsyncTransaction(self, nested=True)
async def exec_driver_sql(
self,
- statement: Executable,
- parameters: Optional[Mapping] = None,
- execution_options: Mapping = util.EMPTY_DICT,
- ) -> Result:
+ statement,
+ parameters=None,
+ execution_options=util.EMPTY_DICT,
+ ):
r"""Executes a driver-level SQL string and return buffered
:class:`_engine.Result`.
async def stream(
self,
- statement: Executable,
- parameters: Optional[Mapping] = None,
- execution_options: Mapping = util.EMPTY_DICT,
- ) -> AsyncResult:
+ statement,
+ parameters=None,
+ execution_options=util.EMPTY_DICT,
+ ):
"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object."""
async def execute(
self,
- statement: Executable,
- parameters: Optional[Mapping] = None,
- execution_options: Mapping = util.EMPTY_DICT,
- ) -> Result:
+ statement,
+ parameters=None,
+ execution_options=util.EMPTY_DICT,
+ ):
r"""Executes a SQL statement construct and return a buffered
:class:`_engine.Result`.
async def scalar(
self,
- statement: Executable,
- parameters: Optional[Mapping] = None,
- execution_options: Mapping = util.EMPTY_DICT,
- ) -> Any:
+ statement,
+ parameters=None,
+ execution_options=util.EMPTY_DICT,
+ ):
r"""Executes a SQL statement construct and returns a scalar object.
This method is shorthand for invoking the
result = await self.execute(statement, parameters, execution_options)
return result.scalar()
- async def run_sync(self, fn: Callable, *arg, **kw) -> Any:
+ async def run_sync(self, fn, *arg, **kw):
"""Invoke the given sync callable passing self as the first argument.
This method maintains the asyncio event loop all the way through
await self.transaction.__aexit__(type_, value, traceback)
await self.conn.close()
- def __init__(self, sync_engine: Engine):
+ def __init__(self, sync_engine):
if not sync_engine.dialect.is_async:
raise exc.InvalidRequestError(
"The asyncio extension requires an async driver to be used. "
conn = self.connect()
return self._trans_ctx(conn)
- def connect(self) -> AsyncConnection:
+ def connect(self):
"""Return an :class:`_asyncio.AsyncConnection` object.
The :class:`_asyncio.AsyncConnection` will procure a database
return self._connection_cls(self)
- async def raw_connection(self) -> Any:
+ async def raw_connection(self):
"""Return a "raw" DBAPI connection from the connection pool.
.. seealso::
__slots__ = ("connection", "sync_transaction", "nested")
- def __init__(self, connection: AsyncConnection, nested: bool = False):
+ def __init__(self, connection, nested=False):
self.connection = connection
- self.sync_transaction: Optional[Transaction] = None
+ self.sync_transaction = None
self.nested = nested
@classmethod
def _from_existing_transaction(
- cls,
- connection: AsyncConnection,
- sync_transaction: Transaction,
- nested: bool = False,
+ cls, connection, sync_transaction, nested=False
):
obj = cls.__new__(cls)
obj.connection = connection
return self.sync_transaction
@property
- def is_valid(self) -> bool:
+ def is_valid(self):
return self._sync_transaction().is_valid
@property
- def is_active(self) -> bool:
+ def is_active(self):
return self._sync_transaction().is_active
async def close(self):
import operator
-from ... import util
from ...engine.result import _NO_ROW
from ...engine.result import FilterResult
from ...engine.result import FrozenResult
from ...engine.result import MergedResult
from ...util.concurrency import greenlet_spawn
-if util.TYPE_CHECKING:
- from typing import Any
- from typing import Int
- from typing import Iterator
- from typing import List
- from typing import Mapping
- from typing import Optional
-
- from ...engine.result import Row
-
class AsyncCommon(FilterResult):
async def close(self):
return self
def columns(self, *col_expressions):
- # type: (*object) -> AsyncResult
r"""Establish the columns that should be returned in each row.
Refer to :meth:`_engine.Result.columns` in the synchronous
return self._column_slices(col_expressions)
async def partitions(self, size=None):
- # type: (Optional[Int]) -> Iterator[List[Any]]
"""Iterate through sub-lists of rows of the size given.
An async iterator is returned::
break
async def fetchone(self):
- # type: () -> Row
"""Fetch one row.
When all rows are exhausted, returns None.
return row
async def fetchmany(self, size=None):
- # type: (Optional[Int]) -> List[Row]
"""Fetch many rows.
When all rows are exhausted, returns an empty list.
return await greenlet_spawn(self._manyrow_getter, self, size)
async def all(self):
- # type: () -> List[Row]
"""Return all rows in a list.
Closes the result set after invocation. Subsequent invocations
return row
async def first(self):
- # type: () -> Row
"""Fetch the first row or None if no row is present.
Closes the result set and discards remaining rows.
return await greenlet_spawn(self._only_one_row, False, False, False)
async def one_or_none(self):
- # type: () -> Optional[Row]
"""Return at most one result or raise an exception.
Returns ``None`` if the result has no rows.
return await greenlet_spawn(self._only_one_row, True, False, False)
async def scalar_one(self):
- # type: () -> Any
"""Return exactly one scalar result or raise an exception.
This is equivalent to calling :meth:`_asyncio.AsyncResult.scalars` and
return await greenlet_spawn(self._only_one_row, True, True, True)
async def scalar_one_or_none(self):
- # type: () -> Optional[Any]
"""Return exactly one or no scalar result.
This is equivalent to calling :meth:`_asyncio.AsyncResult.scalars` and
return await greenlet_spawn(self._only_one_row, True, False, True)
async def one(self):
- # type: () -> Row
"""Return exactly one row or raise an exception.
Raises :class:`.NoResultFound` if the result returns no
return await greenlet_spawn(self._only_one_row, True, True, False)
async def scalar(self):
- # type: () -> Optional[Any]
"""Fetch the first column of the first row, and close the result set.
Returns None if there are no rows to fetch.
return MergedResult(self._metadata, (self,) + others)
def scalars(self, index=0):
- # type: (Int) -> AsyncScalarResult
"""Return an :class:`_asyncio.AsyncScalarResult` filtering object which
will return single elements rather than :class:`_row.Row` objects.
return AsyncScalarResult(self._real_result, index)
def mappings(self):
- # type() -> AsyncMappingResult
"""Apply a mappings filter to returned rows, returning an instance of
:class:`_asyncio.AsyncMappingResult`.
self._unique_filter_state = real_result._unique_filter_state
def unique(self, strategy=None):
- # type: () -> AsyncScalarResult
"""Apply unique filtering to the objects returned by this
:class:`_asyncio.AsyncScalarResult`.
return self
async def partitions(self, size=None):
- # type: (Optional[Int]) -> Iterator[List[Any]]
"""Iterate through sub-lists of elements of the size given.
Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that
break
async def fetchall(self):
- # type: () -> List[Any]
"""A synonym for the :meth:`_asyncio.AsyncScalarResult.all` method."""
return await greenlet_spawn(self._allrows)
async def fetchmany(self, size=None):
- # type: (Optional[Int]) -> List[Any]
"""Fetch many objects.
Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that
return await greenlet_spawn(self._manyrow_getter, self, size)
async def all(self):
- # type: () -> List[Any]
"""Return all scalar values in a list.
Equivalent to :meth:`_asyncio.AsyncResult.all` except that
return row
async def first(self):
- # type: () -> Optional[Any]
"""Fetch the first object or None if no object is present.
Equivalent to :meth:`_asyncio.AsyncResult.first` except that
return await greenlet_spawn(self._only_one_row, False, False, False)
async def one_or_none(self):
- # type: () -> Optional[Any]
"""Return at most one object or raise an exception.
Equivalent to :meth:`_asyncio.AsyncResult.one_or_none` except that
return await greenlet_spawn(self._only_one_row, True, False, False)
async def one(self):
- # type: () -> Any
"""Return exactly one object or raise an exception.
Equivalent to :meth:`_asyncio.AsyncResult.one` except that
return self._metadata.keys
def unique(self, strategy=None):
- # type: () -> AsyncMappingResult
"""Apply unique filtering to the objects returned by this
:class:`_asyncio.AsyncMappingResult`.
return self
def columns(self, *col_expressions):
- # type: (*object) -> AsyncMappingResult
r"""Establish the columns that should be returned in each row."""
return self._column_slices(col_expressions)
async def partitions(self, size=None):
- # type: (Optional[Int]) -> Iterator[List[Mapping]]
"""Iterate through sub-lists of elements of the size given.
Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that
break
async def fetchall(self):
- # type: () -> List[Mapping]
"""A synonym for the :meth:`_asyncio.AsyncMappingResult.all` method."""
return await greenlet_spawn(self._allrows)
async def fetchone(self):
- # type: () -> Mapping
"""Fetch one object.
Equivalent to :meth:`_asyncio.AsyncResult.fetchone` except that
return row
async def fetchmany(self, size=None):
- # type: (Optional[Int]) -> List[Mapping]
"""Fetch many objects.
Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that
return await greenlet_spawn(self._manyrow_getter, self, size)
async def all(self):
- # type: () -> List[Mapping]
"""Return all scalar values in a list.
Equivalent to :meth:`_asyncio.AsyncResult.all` except that
return row
async def first(self):
- # type: () -> Optional[Mapping]
"""Fetch the first object or None if no object is present.
Equivalent to :meth:`_asyncio.AsyncResult.first` except that
return await greenlet_spawn(self._only_one_row, False, False, False)
async def one_or_none(self):
- # type: () -> Optional[Mapping]
"""Return at most one object or raise an exception.
Equivalent to :meth:`_asyncio.AsyncResult.one_or_none` except that
return await greenlet_spawn(self._only_one_row, True, False, False)
async def one(self):
- # type: () -> Mapping
"""Return exactly one object or raise an exception.
Equivalent to :meth:`_asyncio.AsyncResult.one` except that
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-
-from typing import Any
-from typing import Callable
-from typing import Mapping
-from typing import Optional
-from typing import TypeVar
-
from . import engine
from . import result as _result
from .base import StartableContext
-from .engine import AsyncEngine
from ... import util
-from ...engine import Result
from ...orm import Session
-from ...sql import Executable
from ...util.concurrency import greenlet_spawn
-T = TypeVar("T")
-
-
@util.create_proxy_methods(
Session,
":class:`_orm.Session`",
dispatch = None
- def __init__(
- self,
- bind: AsyncEngine = None,
- binds: Mapping[object, AsyncEngine] = None,
- **kw
- ):
+ def __init__(self, bind=None, binds=None, **kw):
kw["future"] = True
if bind:
self.bind = bind
with_for_update=with_for_update,
)
- async def run_sync(self, fn: Callable[..., T], *arg, **kw) -> T:
+ async def run_sync(self, fn, *arg, **kw):
"""Invoke the given sync callable passing sync self as the first
argument.
async def execute(
self,
- statement: Executable,
- params: Optional[Mapping] = None,
- execution_options: Mapping = util.EMPTY_DICT,
- bind_arguments: Optional[Mapping] = None,
+ statement,
+ params=None,
+ execution_options=util.EMPTY_DICT,
+ bind_arguments=None,
**kw
- ) -> Result:
+ ):
"""Execute a statement and return a buffered
:class:`_engine.Result` object."""
async def scalar(
self,
- statement: Executable,
- params: Optional[Mapping] = None,
- execution_options: Mapping = util.EMPTY_DICT,
- bind_arguments: Optional[Mapping] = None,
+ statement,
+ params=None,
+ execution_options=util.EMPTY_DICT,
+ bind_arguments=None,
**kw
- ) -> Any:
+ ):
"""Execute a statement and return a scalar result."""
result = await self.execute(
from ..util import hybridmethod
from ..util import hybridproperty
-if util.TYPE_CHECKING:
- from .mapper import Mapper
-
def has_inherited_table(cls):
"""Given a class, return True if any of the classes it inherits from has a
return decorate
def map_declaratively(self, cls):
- # type: (type) -> Mapper
"""Map a class declaratively.
In this form of mapping, the class is scanned for mapping information,
from ..sql.base import ExecutableOption
from ..sql.traversals import HasCacheKey
-if util.TYPE_CHECKING:
- from typing import Any
- from typing import List
- from typing import Optional
-
- from .mapper import Mapper
- from .util import AliasedInsp
__all__ = (
"EXT_CONTINUE",
def __init__(
self,
- prop, # type: MapperProperty
- parentmapper, # type: Mapper
- adapt_to_entity=None, # type: Optional[AliasedInsp]
+ prop,
+ parentmapper,
+ adapt_to_entity=None,
):
self.prop = self.property = prop
self._parententity = adapt_to_entity or parentmapper
def __clause_element__(self):
raise NotImplementedError("%r" % self)
- def _bulk_update_tuples(
- self, value # type: (operators.ColumnOperators)
- ):
- # type: (...) -> List[tuple[operators.ColumnOperators, Any]]
+ def _bulk_update_tuples(self, value):
"""Receive a SQL expression that represents a value in the SET
clause of an UPDATE statement.
from ..sql.util import visit_binary_product
-if util.TYPE_CHECKING:
- from typing import Union
-
- from .util import AliasedInsp
-
-
def remote(expr):
"""Annotate a portion of a primaryjoin expression
with a 'remote' annotation.
@util.memoized_property
@util.preload_module("sqlalchemy.orm.mapper")
- def entity(self): # type: () -> Union[AliasedInsp, mapperlib.Mapper]
+ def entity(self):
"""Return the target mapped entity, which is an inspect() of the
class or aliased class that is referred towards.
from ..sql.base import _generative
from ..sql.base import Generative
-if util.TYPE_CHECKING:
- from typing import Sequence
-
- from .context import QueryContext
- from ..sql.elements import ColumnElement
-
class Load(Generative, LoaderOption):
"""Represents loader options which modify the state of a
return load
def _generate_extra_criteria(self, context):
- # type: (QueryContext) -> Sequence[ColumnElement]
"""Apply the current bound parameters in a QueryContext to the
"extra_criteria" stored with this Load object.
from ..util import HasMemoized
from ..util import hybridmethod
-if util.TYPE_CHECKING:
- from types import ModuleType
-coercions = None # type: ModuleType
-elements = None # type: ModuleType
-type_api = None # type: ModuleType
+coercions = None
+elements = None
+type_api = None
PARSE_AUTOCOMMIT = util.symbol("PARSE_AUTOCOMMIT")
NO_ARG = util.symbol("NO_ARG")
from .. import util
from ..util import collections_abc
-if util.TYPE_CHECKING:
- from types import ModuleType
-
-elements = None # type: ModuleType
-lambdas = None # type: ModuleType
-schema = None # type: ModuleType
-selectable = None # type: ModuleType
-sqltypes = None # type: ModuleType
-traversals = None # type: ModuleType
+
+elements = None
+lambdas = None
+schema = None
+selectable = None
+sqltypes = None
+traversals = None
def _is_literal(element):
from .. import inspection
from .. import util
-if util.TYPE_CHECKING:
- from typing import Any
- from typing import Optional
- from typing import Union
-
def collate(expression, collation):
"""Return the clause ``expression COLLATE collation``.
)
def self_group(self, against=None):
- # type: (Optional[Any]) -> ClauseElement
"""Apply a 'grouping' to this :class:`_expression.ClauseElement`.
This method is overridden by subclasses to return a "grouping"
_alt_names = ()
def self_group(self, against=None):
- # type: (Optional[Any]) -> ClauseElement
if (
against in (operators.and_, operators.or_, operators._asbool)
and self.type._type_affinity is type_api.BOOLEANTYPE._type_affinity
return self.type.comparator_factory(self)
def self_group(self, against=None):
- # type: (Optional[Any]) -> Union[Grouping, TextClause]
if against is operators.in_op:
return Grouping(self)
else:
return list(itertools.chain(*[c._from_objects for c in self.clauses]))
def self_group(self, against=None):
- # type: (Optional[Any]) -> ClauseElement
if self.group and operators.is_precedent(self.operator, against):
return Grouping(self)
else:
return (self,)
def self_group(self, against=None):
- # type: (Optional[Any]) -> ClauseElement
if not self.clauses:
return self
else:
return ClauseElement._negate(self)
def self_group(self, against=None):
- # type: (Optional[Any]) -> ClauseElement
if self.operator and operators.is_precedent(self.operator, against):
return Grouping(self)
else:
return self.element
def self_group(self, against=None):
- # type: (Optional[Any]) -> ClauseElement
return self
def _negate(self):
return self.left._from_objects + self.right._from_objects
def self_group(self, against=None):
- # type: (Optional[Any]) -> ClauseElement
if operators.is_precedent(self.operator, against):
return Grouping(self)
self.type = type_api.NULLTYPE
def self_group(self, against=None):
- # type: (Optional[Any]) -> ClauseElement
assert against is operator.getitem
return self
__visit_name__ = "grouping"
def self_group(self, against=None):
- # type: (Optional[Any]) -> ClauseElement
return self
def _ungroup(self):
return self._element.self_group(against=operators.as_)
def self_group(self, against=None):
- # type: (Optional[Any]) -> ClauseElement
return self._apply_to_inner(self._element.self_group, against=against)
def _negate(self):
def safe_construct(
cls, seed, body, enclosing_label=None, sanitize_key=False
):
- # type: (int, str, Optional[_anonymous_label]) -> _anonymous_label
if sanitize_key:
body = re.sub(r"[%\(\) \$]+", "_", body).strip("_")
from .. import util
from ..inspection import inspect
-if util.TYPE_CHECKING:
- from typing import Any
- from typing import Optional
-
class _OffsetLimitParam(BindParameter):
inherit_cache = True
is_select = True
def _generate_fromclause_column_proxies(self, fromclause):
- # type: (FromClause) -> None
raise NotImplementedError()
def _refresh_for_new_column(self, column):
_is_select_container = True
def __init__(self, element):
- # type: (SelectBase) -> None
self.element = coercions.expect(roles.SelectStatementRole, element)
def _ensure_disambiguated_names(self):
return self.element
def self_group(self, against=None):
- # type: (Optional[Any]) -> FromClause
return self
def _generate_fromclause_column_proxies(self, subquery):
return self.selects[0]._scalar_type()
def self_group(self, against=None):
- # type: (Optional[Any]) -> FromClause
return SelectStatementGrouping(self)
def is_derived_from(self, fromclause):
import pytest
-
-try:
- import typing
-except ImportError:
- pass
-else:
- if typing.TYPE_CHECKING:
- from typing import Sequence
-
try:
import xdist # noqa
return fn
else:
if argnames is None:
- _argnames = getargspec(fn).args[1:] # type: Sequence(str)
+ _argnames = getargspec(fn).args[1:]
else:
- _argnames = re.split(
- r", *", argnames
- ) # type: Sequence(str)
+ _argnames = re.split(r", *", argnames)
if has_exclusions:
_argnames += ["_exclusions"]
FOLLOWER_IDENT = None
-if compat.TYPE_CHECKING:
- from ..engine import URL
-
class register(object):
def __init__(self):
@register.init
def generate_driver_url(url, driver, query_str):
- # type: (URL, str, str) -> URL
backend = url.get_backend_name()
new_url = url.set(