--- /dev/null
+.. change::
+ :tags: bug, orm
+ :tickets: 8853, 9335
+
+ Continued the fix for :ticket:`8853`, allowing the :class:`_orm.Mapped`
+ name to be fully qualified regardless of whether or not
+ ``from __annotations__ import future`` were present. This issue first fixed
+ in 2.0.0b3 confirmed that this case worked via the test suite, however the
+ test suite apparently was not testing the behavior for the name ``Mapped``
+ not being locally present at all; string resolution has been updated to
+ ensure the ``Mapped`` symbol is locatable as applies to how the ORM uses
+ these functions.
from .util import _extract_mapped_subtype
from .util import _is_mapped_annotation
from .util import class_mapper
+from .util import de_stringify_annotation
from .. import event
from .. import exc
from .. import util
from ..sql.schema import Table
from ..util import topological
from ..util.typing import _AnnotationScanType
-from ..util.typing import de_stringify_annotation
from ..util.typing import is_fwd_ref
from ..util.typing import is_literal
from ..util.typing import Protocol
from .interfaces import MapperProperty
from .interfaces import PropComparator
from .util import _none_set
+from .util import de_stringify_annotation
from .. import event
from .. import exc as sa_exc
from .. import schema
from ..sql import expression
from ..sql import operators
from ..sql.elements import BindParameter
-from ..util.typing import de_stringify_annotation
from ..util.typing import is_fwd_ref
from ..util.typing import is_pep593
from ..util.typing import typing_get_args
from .interfaces import PropComparator
from .interfaces import StrategizedProperty
from .relationships import RelationshipProperty
+from .util import de_stringify_annotation
+from .util import de_stringify_union_elements
from .. import exc as sa_exc
from .. import ForeignKey
from .. import log
from ..sql.schema import SchemaConst
from ..sql.type_api import TypeEngine
from ..util.typing import de_optionalize_union_types
-from ..util.typing import de_stringify_annotation
-from ..util.typing import de_stringify_union_elements
from ..util.typing import is_fwd_ref
from ..util.typing import is_optional_union
from ..util.typing import is_pep593
from __future__ import annotations
import enum
+import functools
import re
import types
import typing
from .base import class_mapper as class_mapper
from .base import InspectionAttr as InspectionAttr
from .base import instance_str as instance_str # noqa: F401
+from .base import Mapped
from .base import object_mapper as object_mapper
from .base import object_state as object_state # noqa: F401
from .base import opt_manager_of_class
from ..sql.elements import KeyedColumnElement
from ..sql.selectable import FromClause
from ..util.langhelpers import MemoizedSlots
-from ..util.typing import de_stringify_annotation
-from ..util.typing import eval_name_only
+from ..util.typing import de_stringify_annotation as _de_stringify_annotation
+from ..util.typing import (
+ de_stringify_union_elements as _de_stringify_union_elements,
+)
+from ..util.typing import eval_name_only as _eval_name_only
from ..util.typing import is_origin_of_cls
from ..util.typing import Literal
+from ..util.typing import Protocol
from ..util.typing import typing_get_origin
if typing.TYPE_CHECKING:
from ..sql.selectable import Subquery
from ..sql.visitors import anon_map
from ..util.typing import _AnnotationScanType
+ from ..util.typing import ArgsTypeProcotol
_T = TypeVar("_T", bound=Any)
)
+_de_stringify_partial = functools.partial(
+ functools.partial, locals_=util.immutabledict({"Mapped": Mapped})
+)
+
+# partial is practically useless as we have to write out the whole
+# function and maintain the signature anyway
+
+
+class _DeStringifyAnnotation(Protocol):
+ def __call__(
+ self,
+ cls: Type[Any],
+ annotation: _AnnotationScanType,
+ originating_module: str,
+ *,
+ str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
+ include_generic: bool = False,
+ ) -> Type[Any]:
+ ...
+
+
+de_stringify_annotation = cast(
+ _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation)
+)
+
+
+class _DeStringifyUnionElements(Protocol):
+ def __call__(
+ self,
+ cls: Type[Any],
+ annotation: ArgsTypeProcotol,
+ originating_module: str,
+ *,
+ str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
+ ) -> Type[Any]:
+ ...
+
+
+de_stringify_union_elements = cast(
+ _DeStringifyUnionElements,
+ _de_stringify_partial(_de_stringify_union_elements),
+)
+
+
+class _EvalNameOnly(Protocol):
+ def __call__(self, name: str, module_name: str) -> Any:
+ ...
+
+
+eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only))
+
+
class CascadeOptions(FrozenSet[str]):
"""Keeps track of the options sent to
:paramref:`.relationship.cascade`"""
cls,
raw_annotation,
originating_module,
- _cleanup_mapped_str_annotation,
+ str_cleanup_fn=_cleanup_mapped_str_annotation,
)
except _CleanupError as ce:
raise sa_exc.ArgumentError(
from typing import ForwardRef
from typing import Generic
from typing import Iterable
+from typing import Mapping
from typing import NewType
from typing import NoReturn
from typing import Optional
cls: Type[Any],
annotation: _AnnotationScanType,
originating_module: str,
+ locals_: Mapping[str, Any],
+ *,
str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
include_generic: bool = False,
) -> Type[Any]:
if str_cleanup_fn:
annotation = str_cleanup_fn(annotation, originating_module)
- annotation = eval_expression(annotation, originating_module)
+ annotation = eval_expression(
+ annotation, originating_module, locals_=locals_
+ )
if (
include_generic
cls,
elem,
originating_module,
+ locals_,
str_cleanup_fn=str_cleanup_fn,
include_generic=include_generic,
)
return annotation.__origin__[elements] # type: ignore
-def eval_expression(expression: str, module_name: str) -> Any:
+def eval_expression(
+ expression: str,
+ module_name: str,
+ *,
+ locals_: Optional[Mapping[str, Any]] = None,
+) -> Any:
try:
base_globals: Dict[str, Any] = sys.modules[module_name].__dict__
except KeyError as ke:
f"Module {module_name} isn't present in sys.modules; can't "
f"evaluate expression {expression}"
) from ke
+
try:
- annotation = eval(expression, base_globals, None)
+ annotation = eval(expression, base_globals, locals_)
except Exception as err:
raise NameError(
f"Could not de-stringify annotation {expression!r}"
return annotation
-def eval_name_only(name: str, module_name: str) -> Any:
+def eval_name_only(
+ name: str,
+ module_name: str,
+ *,
+ locals_: Optional[Mapping[str, Any]] = None,
+) -> Any:
if "." in name:
- return eval_expression(name, module_name)
+ return eval_expression(name, module_name, locals_=locals_)
try:
base_globals: Dict[str, Any] = sys.modules[module_name].__dict__
cls: Type[Any],
annotation: ArgsTypeProcotol,
originating_module: str,
+ locals_: Mapping[str, Any],
+ *,
str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
) -> Type[Any]:
return make_union_type(
*[
de_stringify_annotation(
- cls, anno, originating_module, str_cleanup_fn
+ cls,
+ anno,
+ originating_module,
+ {},
+ str_cleanup_fn=str_cleanup_fn,
)
for anno in annotation.__args__
]
--- /dev/null
+"""
+this file tests that absolute imports can be used in declarative
+mappings while guaranteeing that the Mapped name is not locally present
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy
+from sqlalchemy import orm
+import sqlalchemy.orm
+import sqlalchemy.testing
+import sqlalchemy.testing.fixtures
+
+try:
+ x = Mapped # type: ignore
+except NameError:
+ pass
+else:
+ raise Exception("Mapped name **must not be imported in this file**")
+
+
+class MappedColumnTest(
+ sqlalchemy.testing.fixtures.TestBase, sqlalchemy.testing.AssertsCompiledSQL
+):
+ __dialect__ = "default"
+
+ def test_fully_qualified_mapped_name(self, decl_base):
+ """test #8853 *again*, as reported in #9335 this failed to be fixed"""
+
+ class Foo(decl_base):
+ __tablename__ = "foo"
+
+ id: sqlalchemy.orm.Mapped[int] = sqlalchemy.orm.mapped_column(
+ primary_key=True
+ )
+
+ data: sqlalchemy.orm.Mapped[int] = sqlalchemy.orm.mapped_column()
+
+ data2: sqlalchemy.orm.Mapped[int]
+
+ data3: orm.Mapped[int]
+
+ self.assert_compile(
+ sqlalchemy.select(Foo),
+ "SELECT foo.id, foo.data, foo.data2, foo.data3 FROM foo",
+ )
class MappedColumnTest(_MappedColumnTest):
def test_fully_qualified_mapped_name(self, decl_base):
- """test #8853, regression caused by #8759 ;)""" # noqa: E501
+ """test #8853, regression caused by #8759 ;)
+
+
+ See same test in test_abs_import_only
+
+ """
class Foo(decl_base):
__tablename__ = "foo"