) -> AsyncSessionTransaction:
self.sync_transaction = self._assign_proxied(
await greenlet_spawn(
- self.session.sync_session.begin_nested # type: ignore
+ self.session.sync_session.begin_nested
if self.nested
else self.session.sync_session.begin
)
from typing import TypeVar
from typing import Union
-from . import operators
from . import roles
from . import visitors
from ._typing import is_from_clause
% (elem.__class__.__name__)
)
- def _literal_coercion( # type: ignore[override]
- self, element, *, expr, operator, **kw
- ):
+ @util.preload_module("sqlalchemy.sql.elements")
+ def _literal_coercion(self, element, *, expr, operator, **kw):
if util.is_non_string_iterable(element):
non_literal_expressions: Dict[
- Optional[operators.ColumnOperators],
- operators.ColumnOperators,
+ Optional[ColumnElement[Any]],
+ ColumnElement[Any],
] = {}
element = list(element)
for o in element:
if not _is_literal(o):
- if not isinstance(o, operators.ColumnOperators):
+ if not isinstance(
+ o, util.preloaded.sql_elements.ColumnElement
+ ):
self._raise_for_expected(element, **kw)
else: