from . import interfaces
from .descriptor_props import SynonymProperty
from .properties import ColumnProperty
+from .util import _metadata_for_cls
from .util import class_mapper
from .. import exc
from .. import inspection
self._resolvers = ()
self.tables_only = tables_only
+ def _resolve_table_key(
+ self, key: str, metadata: MetaData
+ ) -> Optional[Table]:
+ if metadata.schema is not None and "." not in key:
+ schema_key = _get_table_key(key, metadata.schema)
+ if schema_key in metadata.tables:
+ return metadata.tables[schema_key]
+ if key in metadata.tables:
+ util.warn_deprecated(
+ "The string '%s' was resolved to the "
+ "non-schema-qualified table '%s', however "
+ "the MetaData object has a default schema "
+ "of '%s'. In a future version of SQLAlchemy, "
+ "this unqualified name will be resolved as "
+ "'%s'. To reference a table without a "
+ "schema, use the Table object directly."
+ % (key, key, metadata.schema, schema_key),
+ "2.1",
+ )
+ return metadata.tables[key]
+ elif key in metadata.tables:
+ return metadata.tables[key]
+ return None
+
def _access_cls(self, key: str) -> Any:
cls = self.cls
manager = attributes.manager_of_class(cls)
- decl_base = manager.registry
- assert decl_base is not None
- decl_class_registry = decl_base._class_registry
- metadata = decl_base.metadata
+ registry = manager.registry
+ assert registry is not None
+ decl_class_registry = registry._class_registry
+ metadata = _metadata_for_cls(cls, registry)
if self.tables_only:
- if key in metadata.tables:
- return metadata.tables[key]
+ table = self._resolve_table_key(key, metadata)
+ if table is not None:
+ return table
elif key in metadata._schemas:
- return _GetTable(key, getattr(cls, "metadata", metadata))
+ return _GetTable(key, metadata)
if key in decl_class_registry:
dt = _determine_container(key, decl_class_registry[key])
return dt
if not self.tables_only:
- if key in metadata.tables:
- return metadata.tables[key]
+ table = self._resolve_table_key(key, metadata)
+ if table is not None:
+ return table
elif key in metadata._schemas:
- return _GetTable(key, getattr(cls, "metadata", metadata))
+ return _GetTable(key, metadata)
if "_sa_module_registry" in decl_class_registry and key in cast(
_ModuleMarker, decl_class_registry["_sa_module_registry"]
):
- registry = cast(
+ _module_registry = cast(
_ModuleMarker, decl_class_registry["_sa_module_registry"]
)
- return registry.resolve_attr(key)
+ return _module_registry.resolve_attr(key)
if self._resolvers:
for resolv in self._resolvers:
f"['{clsarg}']] = relationship()\""
) from err
else:
+ manager = attributes.manager_of_class(self.cls)
+ registry = manager.registry
+ metadata = (
+ _metadata_for_cls(self.cls, registry)
+ if registry is not None
+ else None
+ )
+
+ # when deprecated fallback lookup in
+ # _resolve_table_key is removed, consider adding
+ # additional context to the error message if the
+ # unqualified key is located under BLANK_SCHEMA
+ if metadata is not None and metadata.schema is not None:
+ schema_key = _get_table_key(name, metadata.schema)
+ assert schema_key not in metadata.tables
+
raise exc.InvalidRequestError(
"When initializing mapper %s, expression %r failed to "
"locate a name (%r). If this is a class name, consider "
"adding this relationship() to the %r class after "
"both dependent classes have been defined."
- % (self.prop.parent, self.arg, name, self.cls)
+ % (
+ self.prop.parent,
+ self.arg,
+ name,
+ self.cls,
+ )
) from err
def _resolve_name(self) -> Union[Table, Type[Any], _ModNS]:
+from contextlib import nullcontext
+
+from sqlalchemy import BLANK_SCHEMA
from sqlalchemy import Column
from sqlalchemy import exc
+from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import MetaData
+from sqlalchemy import String
from sqlalchemy import testing
+from sqlalchemy.orm import class_mapper
from sqlalchemy.orm import clsregistry
+from sqlalchemy.orm import configure_mappers
+from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import registry
from sqlalchemy.orm import relationship
from sqlalchemy.testing import assert_raises_message
+from sqlalchemy.testing import assertions
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import mock
from sqlalchemy.testing.assertions import expect_warnings
+from sqlalchemy.testing.schema import Table
from sqlalchemy.testing.util import gc_collect
):
registry.configure()
+ @testing.variation("has_default_schema", [True, False])
+ def test_name_resolution_failure_error_message(self, has_default_schema):
+ """test #13291"""
+ if has_default_schema:
+ metadata = MetaData(schema="fooschema")
+ else:
+ metadata = MetaData()
+
+ reg = registry(metadata=metadata)
+ Base = reg.generate_base()
+
+ class MyClass(Base):
+ __tablename__ = "my_table"
+ id = Column(Integer, primary_key=True)
+
+ MyClass.foo = relationship(
+ "Foo",
+ secondary="nonexistent_table",
+ backref="my_classes",
+ )
+
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ r"When initializing mapper .*MyClass.*, expression "
+ r"'nonexistent_table' failed to locate a name "
+ r"\('nonexistent_table'\)",
+ ):
+ reg.configure()
+
+ reg.dispose()
+
def test_no_fns_in_name_resolve(self):
base = registry()
f1 = MockClass(base, "foo.bar.Foo")
del f4
gc_collect()
assert "single" not in reg
+
+ @testing.variation(
+ "resolve_type", ["secondary_only", "primaryjoin_secondaryjoin"]
+ )
+ @testing.variation("owner_schema", ["inherits", "blank", "different"])
+ @testing.variation(
+ "secondary_schema",
+ ["inherits", "blank", "inherits_qualified"],
+ )
+ def test_string_dependency_resolution_default_schema(
+ self, resolve_type, owner_schema, secondary_schema
+ ):
+ """test #13291"""
+ metadata = MetaData(schema="fooschema")
+ Base = declarative_base(metadata=metadata)
+
+ if owner_schema.inherits:
+ owner_schema_kw: dict = {}
+ elif owner_schema.blank:
+ owner_schema_kw = {"schema": BLANK_SCHEMA}
+ elif owner_schema.different:
+ owner_schema_kw = {"schema": "otherschema"}
+ else:
+ owner_schema.fail()
+
+ if secondary_schema.inherits or secondary_schema.inherits_qualified:
+ sec_kw: dict = {}
+ elif secondary_schema.blank:
+ sec_kw = {"schema": BLANK_SCHEMA}
+ else:
+ secondary_schema.fail()
+
+ if secondary_schema.inherits_qualified:
+ secondary_ref = "fooschema.user_to_prop"
+ else:
+ secondary_ref = "user_to_prop"
+
+ class User(Base):
+ __tablename__ = "users"
+ __table_args__ = owner_schema_kw
+ id = Column(Integer, primary_key=True)
+ name = Column(String(50))
+
+ class Prop(Base):
+ __tablename__ = "props"
+ __table_args__ = owner_schema_kw
+ id = Column(Integer, primary_key=True)
+ name = Column(String(50))
+
+ user_to_prop = Table(
+ "user_to_prop",
+ Base.metadata,
+ Column(
+ "user_id",
+ Integer,
+ ForeignKey(User.__table__.c.id),
+ ),
+ Column(
+ "prop_id",
+ Integer,
+ ForeignKey(Prop.__table__.c.id),
+ ),
+ **sec_kw,
+ )
+
+ if resolve_type.secondary_only:
+ User.props = relationship(
+ "Prop",
+ secondary=secondary_ref,
+ backref="users",
+ )
+ elif resolve_type.primaryjoin_secondaryjoin:
+ User.props = relationship(
+ "Prop",
+ secondary=user_to_prop,
+ primaryjoin=("User.id==user_to_prop.c.user_id"),
+ secondaryjoin=("user_to_prop.c.prop_id==Prop.id"),
+ backref="users",
+ )
+ else:
+ resolve_type.fail()
+
+ expects_warning = (
+ secondary_schema.blank and not secondary_schema.inherits_qualified
+ )
+
+ if expects_warning:
+ ctx = assertions.expect_deprecated(
+ r"The string 'user_to_prop' was resolved"
+ )
+ else:
+ ctx = nullcontext()
+
+ with ctx:
+ configure_mappers()
+
+ assert (
+ class_mapper(User).get_property("props").secondary is user_to_prop
+ )
+
+ @testing.variation(
+ "resolve_type", ["secondary_only", "primaryjoin_secondaryjoin"]
+ )
+ @testing.variation(
+ "mapping_style", ["declarative_base", "registry_mapped"]
+ )
+ def test_string_dependency_resolution_cls_metadata(
+ self, resolve_type, mapping_style
+ ):
+ """test #8068"""
+ alt_metadata = MetaData()
+
+ a_to_b = Table(
+ "a_to_b",
+ alt_metadata,
+ Column("a_id", Integer, ForeignKey("a.id")),
+ Column("b_id", Integer, ForeignKey("b.id")),
+ )
+
+ if mapping_style.declarative_base:
+ Base = declarative_base()
+
+ class AltMetadataMixin(Base):
+ __abstract__ = True
+ metadata = alt_metadata
+
+ if resolve_type.secondary_only:
+
+ class A(AltMetadataMixin):
+ __tablename__ = "a"
+ id = Column(Integer, primary_key=True)
+ bs = relationship("B", secondary="a_to_b", backref="as_")
+
+ elif resolve_type.primaryjoin_secondaryjoin:
+
+ class A(AltMetadataMixin):
+ __tablename__ = "a"
+ id = Column(Integer, primary_key=True)
+ bs = relationship(
+ "B",
+ secondary=a_to_b,
+ primaryjoin="A.id==a_to_b.c.a_id",
+ secondaryjoin="a_to_b.c.b_id==B.id",
+ backref="as_",
+ )
+
+ else:
+ resolve_type.fail()
+
+ class B(AltMetadataMixin):
+ __tablename__ = "b"
+ id = Column(Integer, primary_key=True)
+ a_id = Column(Integer, ForeignKey("a.id"))
+
+ elif mapping_style.registry_mapped:
+ reg = registry()
+
+ class AltMetadataMixin:
+ metadata = alt_metadata
+
+ if resolve_type.secondary_only:
+
+ @reg.mapped
+ class A(AltMetadataMixin):
+ __tablename__ = "a"
+ id = Column(Integer, primary_key=True)
+ bs = relationship("B", secondary="a_to_b", backref="as_")
+
+ elif resolve_type.primaryjoin_secondaryjoin:
+
+ @reg.mapped
+ class A(AltMetadataMixin):
+ __tablename__ = "a"
+ id = Column(Integer, primary_key=True)
+ bs = relationship(
+ "B",
+ secondary=a_to_b,
+ primaryjoin="A.id==a_to_b.c.a_id",
+ secondaryjoin="a_to_b.c.b_id==B.id",
+ backref="as_",
+ )
+
+ else:
+ resolve_type.fail()
+
+ @reg.mapped
+ class B(AltMetadataMixin):
+ __tablename__ = "b"
+ id = Column(Integer, primary_key=True)
+ a_id = Column(Integer, ForeignKey("a.id"))
+
+ else:
+ mapping_style.fail()
+
+ configure_mappers()
+
+ assert class_mapper(A).get_property("bs").secondary is a_to_b