) -> Optional[_PathRepresentation]:
i = -1
- for i, (c_token, p_token) in enumerate(zip(to_chop, path.path)):
+ for i, (c_token, p_token) in enumerate(
+ zip(to_chop, path.natural_path)
+ ):
if isinstance(c_token, str):
if i == 0 and c_token.endswith(f":{_DEFAULT_TOKEN}"):
return to_chop
elif (
isinstance(c_token, InspectionAttr)
and insp_is_mapper(c_token)
- and (
- (insp_is_mapper(p_token) and c_token.isa(p_token))
- or (
- # a too-liberal check here to allow a path like
- # A->A.bs->B->B.cs->C->C.ds, natural path, to chop
- # against current path
- # A->A.bs->B(B, B2)->B(B, B2)->cs, in an of_type()
- # scenario which should only be occurring in a loader
- # that is against a non-aliased lead element with
- # single path. otherwise the
- # "B" won't match into the B(B, B2).
- #
- # i>=2 prevents this check from proceeding for
- # the first path element.
- #
- # if we could do away with the "natural_path"
- # concept, we would not need guessy checks like this
- #
- # two conflicting tests for this comparison are:
- # test_eager_relations.py->
- # test_lazyload_aliased_abs_bcs_two
- # and
- # test_of_type.py->test_all_subq_query
- #
- i >= 2
- and insp_is_aliased_class(p_token)
- and p_token._is_with_polymorphic
- and c_token in p_token.with_polymorphic_mappers
- )
- )
+ and insp_is_mapper(p_token)
+ and c_token.isa(p_token)
):
continue
strategy: Optional[Tuple[Any, ...]]
local_opts: _OptsType
- path: Tuple[str, ...]
+ path: Union[Tuple[()], Tuple[str]]
propagate_to_loaders = False
def __init__(self) -> None:
it may be used as the sub-option of a :class:`_orm.Load` object.
"""
+ assert self.path
attr = self.path[0]
if attr.endswith(_DEFAULT_TOKEN):
attr = f"{attr.split(':')[0]}:{_WILDCARD_TOKEN}"
start_path: _PathRepresentation = self.path
- # TODO: chop_path already occurs in loader.process_compile_state()
- # so we will seek to simplify this
if current_path:
+ # TODO: no cases in test suite where we actually get
+ # None back here
new_path = self._chop_path(start_path, current_path)
- if not new_path:
+ if new_path is None:
return
- start_path = new_path
+
+ # chop_path does not actually "chop" a wildcard token path,
+ # just returns it
+ assert new_path == start_path
# start_path is a single-token tuple
assert start_path and len(start_path) == 1
"""
- chopped_start_path = Load._chop_path(effective_path.path, current_path)
+ chopped_start_path = Load._chop_path(
+ effective_path.natural_path, current_path
+ )
if not chopped_start_path:
return None
"""
+from __future__ import annotations
+
+from typing import Optional
+
from sqlalchemy import exists
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.orm import class_mapper
from sqlalchemy.orm import column_property
from sqlalchemy.orm import contains_eager
+from sqlalchemy.orm import immediateload
from sqlalchemy.orm import join
from sqlalchemy.orm import joinedload
+from sqlalchemy.orm import Mapped
+from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import polymorphic_union
from sqlalchemy.orm import relationship
+from sqlalchemy.orm import selectinload
from sqlalchemy.orm import Session
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import with_polymorphic
)
else:
scenario.fail()
+
+
+class PolyIntoSelfReferentialTest(
+ fixtures.DeclarativeMappedTest, AssertsExecutionResults
+):
+ """test for #9715"""
+
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class A(Base):
+ __tablename__ = "a"
+
+ id: Mapped[int] = mapped_column(
+ primary_key=True, autoincrement=True
+ )
+
+ rel_id: Mapped[int] = mapped_column(ForeignKey("related.id"))
+
+ related = relationship("Related")
+
+ class Related(Base):
+ __tablename__ = "related"
+
+ id: Mapped[int] = mapped_column(
+ primary_key=True, autoincrement=True
+ )
+ rel_data: Mapped[str]
+ type: Mapped[str] = mapped_column()
+
+ other_related_id: Mapped[int] = mapped_column(
+ ForeignKey("other_related.id")
+ )
+
+ other_related = relationship("OtherRelated")
+
+ __mapper_args__ = {
+ "polymorphic_identity": "related",
+ "polymorphic_on": type,
+ }
+
+ class SubRelated(Related):
+ __tablename__ = "sub_related"
+
+ id: Mapped[int] = mapped_column(
+ ForeignKey("related.id"), primary_key=True
+ )
+ sub_rel_data: Mapped[str]
+
+ __mapper_args__ = {"polymorphic_identity": "sub_related"}
+
+ class OtherRelated(Base):
+ __tablename__ = "other_related"
+
+ id: Mapped[int] = mapped_column(
+ primary_key=True, autoincrement=True
+ )
+ name: Mapped[str]
+
+ parent_id: Mapped[Optional[int]] = mapped_column(
+ ForeignKey("other_related.id")
+ )
+ parent = relationship("OtherRelated", lazy="raise", remote_side=id)
+
+ @classmethod
+ def insert_data(cls, connection):
+ A, SubRelated, OtherRelated = cls.classes(
+ "A", "SubRelated", "OtherRelated"
+ )
+
+ with Session(connection) as sess:
+
+ grandparent_otherrel1 = OtherRelated(name="GP1")
+ grandparent_otherrel2 = OtherRelated(name="GP2")
+
+ parent_otherrel1 = OtherRelated(
+ name="P1", parent=grandparent_otherrel1
+ )
+ parent_otherrel2 = OtherRelated(
+ name="P2", parent=grandparent_otherrel2
+ )
+
+ otherrel1 = OtherRelated(name="A1", parent=parent_otherrel1)
+ otherrel3 = OtherRelated(name="A2", parent=parent_otherrel2)
+
+ address1 = SubRelated(
+ rel_data="ST1", other_related=otherrel1, sub_rel_data="w1"
+ )
+ address3 = SubRelated(
+ rel_data="ST2", other_related=otherrel3, sub_rel_data="w2"
+ )
+
+ a1 = A(related=address1)
+ a2 = A(related=address3)
+
+ sess.add_all([a1, a2])
+ sess.commit()
+
+ def _run_load(self, *opt):
+ A = self.classes.A
+ stmt = select(A).options(*opt)
+
+ sess = fixture_session()
+ all_a = sess.scalars(stmt).all()
+
+ sess.close()
+
+ with self.assert_statement_count(testing.db, 0):
+ for a1 in all_a:
+ d1 = a1.related
+ d2 = d1.other_related
+ d3 = d2.parent
+ d4 = d3.parent
+ assert d4.name in ("GP1", "GP2")
+
+ @testing.variation("use_workaround", [True, False])
+ def test_workaround(self, use_workaround):
+ A, Related, SubRelated, OtherRelated = self.classes(
+ "A", "Related", "SubRelated", "OtherRelated"
+ )
+
+ related = with_polymorphic(Related, [SubRelated], flat=True)
+
+ opt = [
+ (
+ joinedload(A.related.of_type(related))
+ .joinedload(related.other_related)
+ .joinedload(
+ OtherRelated.parent,
+ )
+ )
+ ]
+ if use_workaround:
+ opt.append(
+ joinedload(
+ A.related,
+ Related.other_related,
+ OtherRelated.parent,
+ OtherRelated.parent,
+ )
+ )
+ else:
+ opt[0] = opt[0].joinedload(OtherRelated.parent)
+
+ self._run_load(*opt)
+
+ @testing.combinations(
+ (("joined", "joined", "joined", "joined"),),
+ (("selectin", "selectin", "selectin", "selectin"),),
+ (("selectin", "selectin", "joined", "joined"),),
+ (("selectin", "selectin", "joined", "selectin"),),
+ (("joined", "selectin", "joined", "selectin"),),
+ # TODO: immediateload (and lazyload) do not support the target item
+ # being a with_polymorphic. this seems to be a limitation in the
+ # current_path logic
+ # (("immediate", "joined", "joined", "joined"),),
+ argnames="loaders",
+ )
+ @testing.variation("use_wpoly", [True, False])
+ def test_all_load(self, loaders, use_wpoly):
+ A, Related, SubRelated, OtherRelated = self.classes(
+ "A", "Related", "SubRelated", "OtherRelated"
+ )
+
+ if use_wpoly:
+ related = with_polymorphic(Related, [SubRelated], flat=True)
+ else:
+ related = SubRelated
+
+ opt = None
+ for i, (load_type, element) in enumerate(
+ zip(
+ loaders,
+ [
+ A.related.of_type(related),
+ related.other_related,
+ OtherRelated.parent,
+ OtherRelated.parent,
+ ],
+ )
+ ):
+ if i == 0:
+ if load_type == "joined":
+ opt = joinedload(element)
+ elif load_type == "selectin":
+ opt = selectinload(element)
+ elif load_type == "immediate":
+ opt = immediateload(element)
+ else:
+ assert False
+ else:
+ assert opt is not None
+ if load_type == "joined":
+ opt = opt.joinedload(element)
+ elif load_type == "selectin":
+ opt = opt.selectinload(element)
+ elif load_type == "immediate":
+ opt = opt.immediateload(element)
+ else:
+ assert False
+
+ self._run_load(opt)