)
eq_(len(retrieved.children), 1)
+
+
+@testing.combinations(
+ ("single",),
+ ("joined",),
+ id_="s",
+ argnames="inheritance_type",
+)
+class MultiOfTypeContainsEagerTest(fixtures.DeclarativeMappedTest):
+ """test for #10006"""
+
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ employee_m2m = Table(
+ "employee_m2m",
+ Base.metadata,
+ Column(
+ "left", Integer, ForeignKey("employee.id"), primary_key=True
+ ),
+ Column(
+ "right", Integer, ForeignKey("employee.id"), primary_key=True
+ ),
+ )
+
+ class Property(ComparableEntity, Base):
+ __tablename__ = "property"
+ id: Mapped[int] = mapped_column(primary_key=True)
+ value: Mapped[str] = mapped_column(name="value")
+ user_id: Mapped[int] = mapped_column(ForeignKey("employee.id"))
+
+ class Employee(ComparableEntity, Base):
+ __tablename__ = "employee"
+ id: Mapped[int] = mapped_column(primary_key=True)
+ name: Mapped[str]
+ type: Mapped[str]
+ prop1 = relationship(Property, lazy="raise", uselist=False)
+
+ colleagues = relationship(
+ "Employee",
+ secondary=employee_m2m,
+ primaryjoin=lambda: Employee.id == employee_m2m.c.left,
+ secondaryjoin=lambda: Employee.id == employee_m2m.c.right,
+ lazy="raise",
+ collection_class=set,
+ )
+
+ __mapper_args__ = {
+ "polymorphic_on": "type",
+ "polymorphic_identity": "employee",
+ }
+
+ class Manager(Employee):
+ if cls.inheritance_type == "joined":
+ __tablename__ = "manager"
+ id: Mapped[int] = mapped_column( # noqa: A001
+ ForeignKey("employee.id"), primary_key=True
+ )
+ __mapper_args__ = {"polymorphic_identity": "manager"}
+
+ class Engineer(Employee):
+ if cls.inheritance_type == "joined":
+ __tablename__ = "engineer"
+ id: Mapped[int] = mapped_column( # noqa: A001
+ ForeignKey("employee.id"), primary_key=True
+ )
+ __mapper_args__ = {"polymorphic_identity": "engineer"}
+
+ class Clerk(Employee):
+ if cls.inheritance_type == "joined":
+ __tablename__ = "clerk"
+ id: Mapped[int] = mapped_column( # noqa: A001
+ ForeignKey("employee.id"), primary_key=True
+ )
+ __mapper_args__ = {"polymorphic_identity": "clerk"}
+
+ class UnitHead(Employee):
+ if cls.inheritance_type == "joined":
+ __tablename__ = "unithead"
+ id: Mapped[int] = mapped_column( # noqa: A001
+ ForeignKey("employee.id"), primary_key=True
+ )
+ managers = relationship(
+ "Manager",
+ secondary=employee_m2m,
+ primaryjoin=lambda: Employee.id == employee_m2m.c.left,
+ secondaryjoin=lambda: (
+ and_(
+ Employee.id == employee_m2m.c.right,
+ Employee.type == "manager",
+ )
+ ),
+ viewonly=True,
+ lazy="raise",
+ collection_class=set,
+ )
+ __mapper_args__ = {"polymorphic_identity": "unithead"}
+
+ @classmethod
+ def insert_data(cls, connection):
+ UnitHead, Manager, Engineer, Clerk, Property = cls.classes(
+ "UnitHead", "Manager", "Engineer", "Clerk", "Property"
+ )
+
+ with Session(connection) as sess:
+ unithead = UnitHead(
+ type="unithead",
+ name="unithead1",
+ prop1=Property(value="val unithead"),
+ )
+ manager = Manager(
+ type="manager",
+ name="manager1",
+ prop1=Property(value="val manager"),
+ )
+ other_manager = Manager(
+ type="manager",
+ name="manager2",
+ prop1=Property(value="val other manager"),
+ )
+ engineer = Engineer(
+ type="engineer",
+ name="engineer1",
+ prop1=Property(value="val engineer"),
+ )
+ clerk = Clerk(
+ type="clerk", name="clerk1", prop1=Property(value="val clerk")
+ )
+ unithead.colleagues.update([manager, other_manager])
+ manager.colleagues.update([engineer, clerk])
+ sess.add_all([unithead, manager, other_manager, engineer, clerk])
+ sess.commit()
+
+ @testing.variation("query_type", ["joinedload", "contains_eager"])
+ @testing.variation("use_criteria", [True, False])
+ def test_big_query(self, query_type, use_criteria):
+ Employee, UnitHead, Manager, Engineer, Clerk, Property = self.classes(
+ "Employee", "UnitHead", "Manager", "Engineer", "Clerk", "Property"
+ )
+
+ if query_type.contains_eager:
+ mgr = aliased(Manager)
+ clg = aliased(Employee)
+ clgs_prop1 = aliased(Property, name="clgs_prop1")
+
+ query = (
+ select(UnitHead)
+ .options(
+ contains_eager(UnitHead.managers.of_type(mgr))
+ .contains_eager(mgr.colleagues.of_type(clg))
+ .contains_eager(clg.prop1.of_type(clgs_prop1)),
+ )
+ .outerjoin(UnitHead.managers.of_type(mgr))
+ .outerjoin(mgr.colleagues.of_type(clg))
+ .outerjoin(clg.prop1.of_type(clgs_prop1))
+ )
+ if use_criteria:
+ ma_prop1 = aliased(Property)
+ uhead_prop1 = aliased(Property)
+ query = (
+ query.outerjoin(UnitHead.prop1.of_type(uhead_prop1))
+ .outerjoin(mgr.prop1.of_type(ma_prop1))
+ .where(
+ uhead_prop1.value == "val unithead",
+ ma_prop1.value == "val manager",
+ clgs_prop1.value == "val engineer",
+ )
+ )
+ elif query_type.joinedload:
+ if use_criteria:
+ query = (
+ select(UnitHead)
+ .options(
+ joinedload(
+ UnitHead.managers.and_(
+ Manager.prop1.has(value="val manager")
+ )
+ )
+ .joinedload(
+ Manager.colleagues.and_(
+ Employee.prop1.has(value="val engineer")
+ )
+ )
+ .joinedload(Employee.prop1),
+ )
+ .where(UnitHead.prop1.has(value="val unithead"))
+ )
+ else:
+ query = select(UnitHead).options(
+ joinedload(UnitHead.managers)
+ .joinedload(Manager.colleagues)
+ .joinedload(Employee.prop1),
+ )
+
+ session = fixture_session()
+ head = session.scalars(query).unique().one()
+
+ if use_criteria:
+ expected_managers = {
+ Manager(
+ name="manager1",
+ colleagues={Engineer(name="engineer1", prop1=Property())},
+ )
+ }
+ else:
+ expected_managers = {
+ Manager(
+ name="manager1",
+ colleagues={
+ Engineer(name="engineer1", prop1=Property()),
+ Clerk(name="clerk1"),
+ },
+ ),
+ Manager(name="manager2"),
+ }
+ eq_(
+ head,
+ UnitHead(managers=expected_managers),
+ )