]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
warnings: with_polymorphic()
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 29 Oct 2021 16:10:37 +0000 (12:10 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 29 Oct 2021 20:23:08 +0000 (16:23 -0400)
Also clarifies a behavior of None/False for the selectable
parameter to with_polymorphic()

Fixes: #7262
Change-Id: I58c4004e0af227d3995e9ae2461470440f97f252

lib/sqlalchemy/orm/util.py
lib/sqlalchemy/testing/warnings.py
test/orm/declarative/test_inheritance.py
test/orm/inheritance/test_concrete.py
test/orm/inheritance/test_deprecations.py
test/orm/inheritance/test_polymorphic_rel.py
test/orm/inheritance/test_relationship.py
test/orm/inheritance/test_single.py
test/orm/test_cache_key.py
test/orm/test_deprecations.py
test/orm/test_subquery_relations.py

index 67be165f6b39a7945f9ae7ecef1d1b25e8936ad7..0e8449068095180ee6e515b6954f1aa46b96eaa0 100644 (file)
@@ -1359,6 +1359,14 @@ def with_polymorphic(
         result in their table being appended directly to the FROM clause
         which will usually lead to incorrect results.
 
+        When left at its default value of ``False``, the polymorphic
+        selectable assigned to the base mapper is used for selecting rows.
+        However, it may also be passed as ``None``, which will bypass the
+        configured polymorphic selectable and instead construct an ad-hoc
+        selectable for the target classes given; for joined table inheritance
+        this will be a join that includes all target mappers and their
+        subclasses.
+
     :param polymorphic_on: a column to be used as the "discriminator"
         column for the given selectable. If not given, the polymorphic_on
         attribute of the base classes' mapper will be used, if any. This
index ae2c7916d9c0283125495c3dde399bd205d04438..2a0c486d2306f75b6ac396793a3f7b0167babdc7 100644 (file)
@@ -65,11 +65,6 @@ def setup_filters():
     # be "error" however for  I98b8defdf7c37b818b3824d02f7668e3f5f31c94
     # we are moving one at a time
     for msg in [
-        #
-        # ORM Query
-        #
-        r"The Query.with_polymorphic\(\) method is considered "
-        "legacy as of the 1.x series",
         #
         # ORM Session
         #
index 1740ee90b3a7da80e86a00194923feaadd601b40..7e43e255954d203a29ce097e424b9a2cc0304dcf 100644 (file)
@@ -9,6 +9,7 @@ from sqlalchemy.orm import configure_mappers
 from sqlalchemy.orm import declared_attr
 from sqlalchemy.orm import deferred
 from sqlalchemy.orm import relationship
+from sqlalchemy.orm import with_polymorphic
 from sqlalchemy.orm.decl_api import registry
 from sqlalchemy.testing import assert_raises
 from sqlalchemy.testing import assert_raises_message
@@ -967,10 +968,11 @@ class DeclarativeInheritanceTest(DeclarativeTestBase):
         sess.add(c2)
         sess.flush()
         sess.expunge_all()
+
+        wp = with_polymorphic(Person, [Engineer])
         eq_(
-            sess.query(Person)
-            .with_polymorphic(Engineer)
-            .filter(Engineer.primary_language == "cobol")
+            sess.query(wp)
+            .filter(wp.Engineer.primary_language == "cobol")
             .first(),
             Engineer(name="vlad"),
         )
index 63a170581762402252653f3a9ea83af16d99dc34..d9dfa3d9e6cb383cfb45395a1818d2eeaea1c73a 100644 (file)
@@ -1,7 +1,11 @@
 from sqlalchemy import ForeignKey
 from sqlalchemy import Integer
+from sqlalchemy import literal
+from sqlalchemy import null
+from sqlalchemy import select
 from sqlalchemy import String
 from sqlalchemy import testing
+from sqlalchemy import union_all
 from sqlalchemy.ext.hybrid import hybrid_property
 from sqlalchemy.orm import attributes
 from sqlalchemy.orm import class_mapper
@@ -10,6 +14,7 @@ from sqlalchemy.orm import configure_mappers
 from sqlalchemy.orm import joinedload
 from sqlalchemy.orm import polymorphic_union
 from sqlalchemy.orm import relationship
+from sqlalchemy.orm.util import with_polymorphic
 from sqlalchemy.testing import assert_raises
 from sqlalchemy.testing import assert_raises_message
 from sqlalchemy.testing import eq_
@@ -20,68 +25,10 @@ from sqlalchemy.testing.schema import Column
 from sqlalchemy.testing.schema import Table
 
 
-class Employee(object):
-    def __init__(self, name):
-        self.name = name
-
-    def __repr__(self):
-        return self.__class__.__name__ + " " + self.name
-
-
-class Manager(Employee):
-    def __init__(self, name, manager_data):
-        self.name = name
-        self.manager_data = manager_data
-
-    def __repr__(self):
-        return (
-            self.__class__.__name__ + " " + self.name + " " + self.manager_data
-        )
-
-
-class Engineer(Employee):
-    def __init__(self, name, engineer_info):
-        self.name = name
-        self.engineer_info = engineer_info
-
-    def __repr__(self):
-        return (
-            self.__class__.__name__
-            + " "
-            + self.name
-            + " "
-            + self.engineer_info
-        )
-
-
-class Hacker(Engineer):
-    def __init__(self, name, nickname, engineer_info):
-        self.name = name
-        self.nickname = nickname
-        self.engineer_info = engineer_info
-
-    def __repr__(self):
-        return (
-            self.__class__.__name__
-            + " "
-            + self.name
-            + " '"
-            + self.nickname
-            + "' "
-            + self.engineer_info
-        )
-
-
-class Company(object):
-    pass
-
-
 class ConcreteTest(fixtures.MappedTest):
     @classmethod
     def define_tables(cls, metadata):
-        global managers_table, engineers_table, hackers_table
-        global companies, employees_table
-        companies = Table(
+        Table(
             "companies",
             metadata,
             Column(
@@ -89,7 +36,7 @@ class ConcreteTest(fixtures.MappedTest):
             ),
             Column("name", String(50)),
         )
-        employees_table = Table(
+        Table(
             "employees",
             metadata,
             Column(
@@ -101,7 +48,7 @@ class ConcreteTest(fixtures.MappedTest):
             Column("name", String(50)),
             Column("company_id", Integer, ForeignKey("companies.id")),
         )
-        managers_table = Table(
+        Table(
             "managers",
             metadata,
             Column(
@@ -114,7 +61,7 @@ class ConcreteTest(fixtures.MappedTest):
             Column("manager_data", String(50)),
             Column("company_id", Integer, ForeignKey("companies.id")),
         )
-        engineers_table = Table(
+        Table(
             "engineers",
             metadata,
             Column(
@@ -127,7 +74,7 @@ class ConcreteTest(fixtures.MappedTest):
             Column("engineer_info", String(50)),
             Column("company_id", Integer, ForeignKey("companies.id")),
         )
-        hackers_table = Table(
+        Table(
             "hackers",
             metadata,
             Column(
@@ -142,7 +89,69 @@ class ConcreteTest(fixtures.MappedTest):
             Column("nickname", String(50)),
         )
 
+    @classmethod
+    def setup_classes(cls):
+        class Employee(cls.Basic):
+            def __init__(self, name):
+                self.name = name
+
+            def __repr__(self):
+                return self.__class__.__name__ + " " + self.name
+
+        class Manager(Employee):
+            def __init__(self, name, manager_data):
+                self.name = name
+                self.manager_data = manager_data
+
+            def __repr__(self):
+                return (
+                    self.__class__.__name__
+                    + " "
+                    + self.name
+                    + " "
+                    + self.manager_data
+                )
+
+        class Engineer(Employee):
+            def __init__(self, name, engineer_info):
+                self.name = name
+                self.engineer_info = engineer_info
+
+            def __repr__(self):
+                return (
+                    self.__class__.__name__
+                    + " "
+                    + self.name
+                    + " "
+                    + self.engineer_info
+                )
+
+        class Hacker(Engineer):
+            def __init__(self, name, nickname, engineer_info):
+                self.name = name
+                self.nickname = nickname
+                self.engineer_info = engineer_info
+
+            def __repr__(self):
+                return (
+                    self.__class__.__name__
+                    + " "
+                    + self.name
+                    + " '"
+                    + self.nickname
+                    + "' "
+                    + self.engineer_info
+                )
+
+        class Company(cls.Basic):
+            pass
+
     def test_basic(self):
+        Employee, Engineer, Manager = self.classes(
+            "Employee", "Engineer", "Manager"
+        )
+        engineers_table, managers_table = self.tables("engineers", "managers")
+
         pjoin = polymorphic_union(
             {"manager": managers_table, "engineer": engineers_table},
             "type",
@@ -166,27 +175,34 @@ class ConcreteTest(fixtures.MappedTest):
             polymorphic_identity="engineer",
         )
         session = fixture_session()
-        session.add(Manager("Tom", "knows how to manage things"))
-        session.add(Engineer("Kurt", "knows how to hack"))
+        session.add(Manager("Sally", "knows how to manage things"))
+        session.add(Engineer("Karina", "knows how to hack"))
         session.flush()
         session.expunge_all()
         assert set([repr(x) for x in session.query(Employee)]) == set(
             [
-                "Engineer Kurt knows how to hack",
-                "Manager Tom knows how to manage things",
+                "Engineer Karina knows how to hack",
+                "Manager Sally knows how to manage things",
             ]
         )
         assert set([repr(x) for x in session.query(Manager)]) == set(
-            ["Manager Tom knows how to manage things"]
+            ["Manager Sally knows how to manage things"]
         )
         assert set([repr(x) for x in session.query(Engineer)]) == set(
-            ["Engineer Kurt knows how to hack"]
+            ["Engineer Karina knows how to hack"]
         )
         manager = session.query(Manager).one()
         session.expire(manager, ["manager_data"])
         eq_(manager.manager_data, "knows how to manage things")
 
     def test_multi_level_no_base(self):
+        Employee, Engineer, Manager = self.classes(
+            "Employee", "Engineer", "Manager"
+        )
+        (Hacker,) = self.classes("Hacker")
+        engineers_table, managers_table = self.tables("engineers", "managers")
+        (hackers_table,) = self.tables("hackers")
+
         pjoin = polymorphic_union(
             {
                 "manager": managers_table,
@@ -228,19 +244,19 @@ class ConcreteTest(fixtures.MappedTest):
             polymorphic_identity="hacker",
         )
         session = fixture_session()
-        tom = Manager("Tom", "knows how to manage things")
+        sally = Manager("Sally", "knows how to manage things")
 
         assert_raises_message(
             AttributeError,
             "does not implement attribute .?'type' at the instance level.",
             setattr,
-            tom,
+            sally,
             "type",
             "sometype",
         )
 
-        jerry = Engineer("Jerry", "knows how to program")
-        hacker = Hacker("Kurt", "Badass", "knows how to hack")
+        jenn = Engineer("Jenn", "knows how to program")
+        hacker = Hacker("Karina", "Badass", "knows how to hack")
 
         assert_raises_message(
             AttributeError,
@@ -251,7 +267,7 @@ class ConcreteTest(fixtures.MappedTest):
             "sometype",
         )
 
-        session.add_all((tom, jerry, hacker))
+        session.add_all((sally, jenn, hacker))
         session.flush()
 
         # ensure "readonly" on save logic didn't pollute the
@@ -259,11 +275,9 @@ class ConcreteTest(fixtures.MappedTest):
 
         assert (
             "nickname"
-            not in attributes.instance_state(jerry).expired_attributes
-        )
-        assert (
-            "name" not in attributes.instance_state(jerry).expired_attributes
+            not in attributes.instance_state(jenn).expired_attributes
         )
+        assert "name" not in attributes.instance_state(jenn).expired_attributes
         assert (
             "name" not in attributes.instance_state(hacker).expired_attributes
         )
@@ -273,40 +287,49 @@ class ConcreteTest(fixtures.MappedTest):
         )
 
         def go():
-            eq_(jerry.name, "Jerry")
+            eq_(jenn.name, "Jenn")
             eq_(hacker.nickname, "Badass")
 
         self.assert_sql_count(testing.db, go, 0)
         session.expunge_all()
         assert (
-            repr(session.query(Employee).filter(Employee.name == "Tom").one())
-            == "Manager Tom knows how to manage things"
+            repr(
+                session.query(Employee).filter(Employee.name == "Sally").one()
+            )
+            == "Manager Sally knows how to manage things"
         )
         assert (
-            repr(session.query(Manager).filter(Manager.name == "Tom").one())
-            == "Manager Tom knows how to manage things"
+            repr(session.query(Manager).filter(Manager.name == "Sally").one())
+            == "Manager Sally knows how to manage things"
         )
         assert set([repr(x) for x in session.query(Employee).all()]) == set(
             [
-                "Engineer Jerry knows how to program",
-                "Manager Tom knows how to manage things",
-                "Hacker Kurt 'Badass' knows how to hack",
+                "Engineer Jenn knows how to program",
+                "Manager Sally knows how to manage things",
+                "Hacker Karina 'Badass' knows how to hack",
             ]
         )
         assert set([repr(x) for x in session.query(Manager).all()]) == set(
-            ["Manager Tom knows how to manage things"]
+            ["Manager Sally knows how to manage things"]
         )
         assert set([repr(x) for x in session.query(Engineer).all()]) == set(
             [
-                "Engineer Jerry knows how to program",
-                "Hacker Kurt 'Badass' knows how to hack",
+                "Engineer Jenn knows how to program",
+                "Hacker Karina 'Badass' knows how to hack",
             ]
         )
         assert set([repr(x) for x in session.query(Hacker).all()]) == set(
-            ["Hacker Kurt 'Badass' knows how to hack"]
+            ["Hacker Karina 'Badass' knows how to hack"]
         )
 
     def test_multi_level_no_base_w_hybrid(self):
+        Employee, Engineer, Manager = self.classes(
+            "Employee", "Engineer", "Manager"
+        )
+        (Hacker,) = self.classes("Hacker")
+        engineers_table, managers_table = self.tables("engineers", "managers")
+        (hackers_table,) = self.tables("hackers")
+
         pjoin = polymorphic_union(
             {
                 "manager": managers_table,
@@ -353,21 +376,21 @@ class ConcreteTest(fixtures.MappedTest):
         )
 
         session = fixture_session()
-        tom = ManagerWHybrid("Tom", "mgrdata")
+        sally = ManagerWHybrid("Sally", "mgrdata")
 
         # mapping did not impact the engineer_info
         # hybrid in any way
         eq_(test_calls.mock_calls, [])
 
-        eq_(tom.engineer_info, "mgrdata")
+        eq_(sally.engineer_info, "mgrdata")
         eq_(test_calls.mock_calls, [mock.call.engineer_info_instance()])
 
-        session.add(tom)
+        session.add(sally)
         session.commit()
 
         session.close()
 
-        tom = (
+        Sally = (
             session.query(ManagerWHybrid)
             .filter(ManagerWHybrid.engineer_info == "mgrdata")
             .one()
@@ -379,9 +402,18 @@ class ConcreteTest(fixtures.MappedTest):
                 mock.call.engineer_info_class(),
             ],
         )
-        eq_(tom.engineer_info, "mgrdata")
+        eq_(Sally.engineer_info, "mgrdata")
 
     def test_multi_level_with_base(self):
+        Employee, Engineer, Manager = self.classes(
+            "Employee", "Engineer", "Manager"
+        )
+        employees_table, engineers_table, managers_table = self.tables(
+            "employees", "engineers", "managers"
+        )
+        (hackers_table,) = self.tables("hackers")
+        (Hacker,) = self.classes("Hacker")
+
         pjoin = polymorphic_union(
             {
                 "employee": employees_table,
@@ -427,14 +459,14 @@ class ConcreteTest(fixtures.MappedTest):
             polymorphic_identity="hacker",
         )
         session = fixture_session()
-        tom = Manager("Tom", "knows how to manage things")
-        jerry = Engineer("Jerry", "knows how to program")
-        hacker = Hacker("Kurt", "Badass", "knows how to hack")
-        session.add_all((tom, jerry, hacker))
+        sally = Manager("Sally", "knows how to manage things")
+        jenn = Engineer("Jenn", "knows how to program")
+        hacker = Hacker("Karina", "Badass", "knows how to hack")
+        session.add_all((sally, jenn, hacker))
         session.flush()
 
         def go():
-            eq_(jerry.name, "Jerry")
+            eq_(jenn.name, "Jenn")
             eq_(hacker.nickname, "Badass")
 
         self.assert_sql_count(testing.db, go, 0)
@@ -455,25 +487,34 @@ class ConcreteTest(fixtures.MappedTest):
         )
         assert set([repr(x) for x in session.query(Employee)]) == set(
             [
-                "Engineer Jerry knows how to program",
-                "Manager Tom knows how to manage things",
-                "Hacker Kurt 'Badass' knows how to hack",
+                "Engineer Jenn knows how to program",
+                "Manager Sally knows how to manage things",
+                "Hacker Karina 'Badass' knows how to hack",
             ]
         )
         assert set([repr(x) for x in session.query(Manager)]) == set(
-            ["Manager Tom knows how to manage things"]
+            ["Manager Sally knows how to manage things"]
         )
         assert set([repr(x) for x in session.query(Engineer)]) == set(
             [
-                "Engineer Jerry knows how to program",
-                "Hacker Kurt 'Badass' knows how to hack",
+                "Engineer Jenn knows how to program",
+                "Hacker Karina 'Badass' knows how to hack",
             ]
         )
         assert set([repr(x) for x in session.query(Hacker)]) == set(
-            ["Hacker Kurt 'Badass' knows how to hack"]
+            ["Hacker Karina 'Badass' knows how to hack"]
         )
 
-    def test_without_default_polymorphic(self):
+    @testing.fixture
+    def two_pjoin_fixture(self):
+        Employee, Engineer, Manager = self.classes(
+            "Employee", "Engineer", "Manager"
+        )
+        (Hacker,) = self.classes("Hacker")
+        (employees_table,) = self.tables("employees")
+        engineers_table, managers_table = self.tables("engineers", "managers")
+        (hackers_table,) = self.tables("hackers")
+
         pjoin = polymorphic_union(
             {
                 "employee": employees_table,
@@ -513,106 +554,248 @@ class ConcreteTest(fixtures.MappedTest):
             concrete=True,
             polymorphic_identity="hacker",
         )
-        session = fixture_session()
+
+        session = fixture_session(expire_on_commit=False)
         jdoe = Employee("Jdoe")
-        tom = Manager("Tom", "knows how to manage things")
-        jerry = Engineer("Jerry", "knows how to program")
-        hacker = Hacker("Kurt", "Badass", "knows how to hack")
-        session.add_all((jdoe, tom, jerry, hacker))
-        session.flush()
+        sally = Manager("Sally", "knows how to manage things")
+        jenn = Engineer("Jenn", "knows how to program")
+        hacker = Hacker("Karina", "Badass", "knows how to hack")
+        session.add_all((jdoe, sally, jenn, hacker))
+        session.commit()
+
+        return (
+            session,
+            Employee,
+            Engineer,
+            Manager,
+            Hacker,
+            pjoin,
+            pjoin2,
+            jdoe,
+            sally,
+            jenn,
+            hacker,
+        )
+
+    def test_without_default_polymorphic_one(self, two_pjoin_fixture):
+        (
+            session,
+            Employee,
+            Engineer,
+            Manager,
+            Hacker,
+            pjoin,
+            pjoin2,
+            jdoe,
+            sally,
+            jenn,
+            hacker,
+        ) = two_pjoin_fixture
+
+        wp = with_polymorphic(
+            Employee, "*", pjoin, polymorphic_on=pjoin.c.type
+        )
+
         eq_(
-            len(
-                session.connection()
-                .execute(
-                    session.query(Employee)
-                    .with_polymorphic("*", pjoin, pjoin.c.type)
-                    .statement
-                )
-                .fetchall()
-            ),
-            4,
+            sorted([repr(x) for x in session.query(wp)]),
+            [
+                "Employee Jdoe",
+                "Engineer Jenn knows how to program",
+                "Hacker Karina 'Badass' knows how to hack",
+                "Manager Sally knows how to manage things",
+            ],
         )
         eq_(session.get(Employee, jdoe.employee_id), jdoe)
-        eq_(session.get(Engineer, jerry.employee_id), jerry)
+        eq_(session.get(Engineer, jenn.employee_id), jenn)
+
+    def test_without_default_polymorphic_two(self, two_pjoin_fixture):
+        (
+            session,
+            Employee,
+            Engineer,
+            Manager,
+            Hacker,
+            pjoin,
+            pjoin2,
+            jdoe,
+            sally,
+            jenn,
+            hacker,
+        ) = two_pjoin_fixture
+        wp = with_polymorphic(
+            Employee, "*", pjoin, polymorphic_on=pjoin.c.type
+        )
+
         eq_(
-            set(
-                [
-                    repr(x)
-                    for x in session.query(Employee).with_polymorphic(
-                        "*", pjoin, pjoin.c.type
-                    )
-                ]
-            ),
-            set(
-                [
-                    "Employee Jdoe",
-                    "Engineer Jerry knows how to program",
-                    "Manager Tom knows how to manage things",
-                    "Hacker Kurt 'Badass' knows how to hack",
-                ]
-            ),
+            sorted([repr(x) for x in session.query(wp)]),
+            [
+                "Employee Jdoe",
+                "Engineer Jenn knows how to program",
+                "Hacker Karina 'Badass' knows how to hack",
+                "Manager Sally knows how to manage things",
+            ],
         )
+
+    def test_without_default_polymorphic_three(self, two_pjoin_fixture):
+        (
+            session,
+            Employee,
+            Engineer,
+            Manager,
+            Hacker,
+            pjoin,
+            pjoin2,
+            jdoe,
+            sally,
+            jenn,
+            hacker,
+        ) = two_pjoin_fixture
         eq_(
-            set([repr(x) for x in session.query(Manager)]),
-            set(["Manager Tom knows how to manage things"]),
+            sorted([repr(x) for x in session.query(Manager)]),
+            ["Manager Sally knows how to manage things"],
+        )
+
+    def test_without_default_polymorphic_four(self, two_pjoin_fixture):
+        (
+            session,
+            Employee,
+            Engineer,
+            Manager,
+            Hacker,
+            pjoin,
+            pjoin2,
+            jdoe,
+            sally,
+            jenn,
+            hacker,
+        ) = two_pjoin_fixture
+        wp2 = with_polymorphic(
+            Engineer, "*", pjoin2, polymorphic_on=pjoin2.c.type
         )
         eq_(
-            set(
-                [
-                    repr(x)
-                    for x in session.query(Engineer).with_polymorphic(
-                        "*", pjoin2, pjoin2.c.type
-                    )
-                ]
-            ),
-            set(
-                [
-                    "Engineer Jerry knows how to program",
-                    "Hacker Kurt 'Badass' knows how to hack",
-                ]
-            ),
+            sorted([repr(x) for x in session.query(wp2)]),
+            [
+                "Engineer Jenn knows how to program",
+                "Hacker Karina 'Badass' knows how to hack",
+            ],
         )
+
+    def test_without_default_polymorphic_five(self, two_pjoin_fixture):
+        (
+            session,
+            Employee,
+            Engineer,
+            Manager,
+            Hacker,
+            pjoin,
+            pjoin2,
+            jdoe,
+            sally,
+            jenn,
+            hacker,
+        ) = two_pjoin_fixture
         eq_(
-            set([repr(x) for x in session.query(Hacker)]),
-            set(["Hacker Kurt 'Badass' knows how to hack"]),
+            [repr(x) for x in session.query(Hacker)],
+            ["Hacker Karina 'Badass' knows how to hack"],
         )
 
-        # test adaption of the column by wrapping the query in a
-        # subquery
+    def test_without_default_polymorphic_six(self, two_pjoin_fixture):
+        (
+            session,
+            Employee,
+            Engineer,
+            Manager,
+            Hacker,
+            pjoin,
+            pjoin2,
+            jdoe,
+            sally,
+            jenn,
+            hacker,
+        ) = two_pjoin_fixture
 
-        with testing.expect_deprecated(r"The Query.from_self\(\) method"):
-            eq_(
-                len(
-                    session.connection()
-                    .execute(
-                        session.query(Engineer)
-                        .with_polymorphic("*", pjoin2, pjoin2.c.type)
-                        .from_self()
-                        .statement
-                    )
-                    .fetchall()
-                ),
-                2,
+        # this test is adapting what used to use from_self().
+        # it's a weird test but how we would do this would be we would only
+        # apply with_polymorprhic once, after we've created whatever
+        # subquery we want.
+
+        subq = pjoin2.select().subquery()
+
+        wp2 = with_polymorphic(Engineer, "*", subq, polymorphic_on=subq.c.type)
+
+        eq_(
+            sorted([repr(x) for x in session.query(wp2)]),
+            [
+                "Engineer Jenn knows how to program",
+                "Hacker Karina 'Badass' knows how to hack",
+            ],
+        )
+
+    @testing.combinations(True, False, argnames="use_star")
+    def test_without_default_polymorphic_buildit_newstyle(
+        self, two_pjoin_fixture, use_star
+    ):
+        """how would we do these concrete polymorphic queries using 2.0 style,
+        and not any old and esoteric features like "polymorphic_union" ?
+
+        """
+        (
+            session,
+            Employee,
+            Engineer,
+            Manager,
+            Hacker,
+            pjoin,
+            pjoin2,
+            jdoe,
+            sally,
+            jenn,
+            hacker,
+        ) = two_pjoin_fixture
+
+        # make a union using the entities as given and wpoly from it.
+        # a UNION is a UNION.  there is no way around having to write
+        # out filler columns.  concrete inh is really not a good choice
+        # when you need to select heterogeneously
+        stmt = union_all(
+            select(
+                literal("engineer").label("type"),
+                Engineer,
+                null().label("nickname"),
+            ),
+            select(literal("hacker").label("type"), Hacker),
+        ).subquery()
+
+        # issue: if we make this with_polymorphic(Engineer, [Hacker], ...),
+        # it blows up and tries to add the "engineer" table for unknown reasons
+
+        if use_star:
+            wp = with_polymorphic(
+                Engineer, "*", stmt, polymorphic_on=stmt.c.type
             )
-        with testing.expect_deprecated(r"The Query.from_self\(\) method"):
-            eq_(
-                set(
-                    [
-                        repr(x)
-                        for x in session.query(Engineer)
-                        .with_polymorphic("*", pjoin2, pjoin2.c.type)
-                        .from_self()
-                    ]
-                ),
-                set(
-                    [
-                        "Engineer Jerry knows how to program",
-                        "Hacker Kurt 'Badass' knows how to hack",
-                    ]
-                ),
+        else:
+            wp = with_polymorphic(
+                Engineer, [Engineer, Hacker], stmt, polymorphic_on=stmt.c.type
             )
 
+        result = session.execute(select(wp)).scalars()
+
+        eq_(
+            sorted(repr(obj) for obj in result),
+            [
+                "Engineer Jenn knows how to program",
+                "Hacker Karina 'Badass' knows how to hack",
+            ],
+        )
+
     def test_relationship(self):
+        Employee, Engineer, Manager = self.classes(
+            "Employee", "Engineer", "Manager"
+        )
+        (Company,) = self.classes("Company")
+        (companies,) = self.tables("companies")
+        engineers_table, managers_table = self.tables("engineers", "managers")
+
         pjoin = polymorphic_union(
             {"manager": managers_table, "engineer": engineers_table},
             "type",
@@ -642,8 +825,8 @@ class ConcreteTest(fixtures.MappedTest):
         )
         session = fixture_session()
         c = Company()
-        c.employees.append(Manager("Tom", "knows how to manage things"))
-        c.employees.append(Engineer("Kurt", "knows how to hack"))
+        c.employees.append(Manager("Sally", "knows how to manage things"))
+        c.employees.append(Engineer("Karina", "knows how to hack"))
         session.add(c)
         session.flush()
         session.expunge_all()
@@ -652,8 +835,8 @@ class ConcreteTest(fixtures.MappedTest):
             c2 = session.get(Company, c.id)
             assert set([repr(x) for x in c2.employees]) == set(
                 [
-                    "Engineer Kurt knows how to hack",
-                    "Manager Tom knows how to manage things",
+                    "Engineer Karina knows how to hack",
+                    "Manager Sally knows how to manage things",
                 ]
             )
 
@@ -666,8 +849,8 @@ class ConcreteTest(fixtures.MappedTest):
             )
             assert set([repr(x) for x in c2.employees]) == set(
                 [
-                    "Engineer Kurt knows how to hack",
-                    "Manager Tom knows how to manage things",
+                    "Engineer Karina knows how to hack",
+                    "Manager Sally knows how to manage things",
                 ]
             )
 
@@ -1055,7 +1238,7 @@ class PropertyInheritanceTest(fixtures.MappedTest):
         eq_(merged_c1.some_dest_id, c1.some_dest_id)
 
 
-class ManyToManyTest(fixtures.MappedTest):
+class ManySallyanyTest(fixtures.MappedTest):
     @classmethod
     def define_tables(cls, metadata):
         Table(
@@ -1073,7 +1256,7 @@ class ManyToManyTest(fixtures.MappedTest):
             ),
         )
         Table(
-            "base_mtom",
+            "base_mSally",
             metadata,
             Column(
                 "base_id", Integer, ForeignKey("base.id"), primary_key=True
@@ -1086,7 +1269,7 @@ class ManyToManyTest(fixtures.MappedTest):
             ),
         )
         Table(
-            "sub_mtom",
+            "sub_mSally",
             metadata,
             Column("base_id", Integer, ForeignKey("sub.id"), primary_key=True),
             Column(
@@ -1116,13 +1299,13 @@ class ManyToManyTest(fixtures.MappedTest):
             pass
 
     def test_selective_relationships(self):
-        sub, base_mtom, Related, Base, related, sub_mtom, base, Sub = (
+        sub, base_mSally, Related, Base, related, sub_mSally, base, Sub = (
             self.tables.sub,
-            self.tables.base_mtom,
+            self.tables.base_mSally,
             self.classes.Related,
             self.classes.Base,
             self.tables.related,
-            self.tables.sub_mtom,
+            self.tables.sub_mSally,
             self.tables.base,
             self.classes.Sub,
         )
@@ -1133,7 +1316,7 @@ class ManyToManyTest(fixtures.MappedTest):
             properties={
                 "related": relationship(
                     Related,
-                    secondary=base_mtom,
+                    secondary=base_mSally,
                     backref="bases",
                     order_by=related.c.id,
                 )
@@ -1147,7 +1330,7 @@ class ManyToManyTest(fixtures.MappedTest):
             properties={
                 "related": relationship(
                     Related,
-                    secondary=sub_mtom,
+                    secondary=sub_mSally,
                     backref="subs",
                     order_by=related.c.id,
                 )
index bf8219cb0205670497c048fb26e42bfd09c96b86..8c807c1152ed1d172aac4b2714d8ccf3aadb9687 100644 (file)
@@ -1,8 +1,20 @@
+from sqlalchemy import exc as sa_exc
 from sqlalchemy import ForeignKey
+from sqlalchemy import func
 from sqlalchemy import Integer
+from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
+from sqlalchemy import null
+from sqlalchemy import select
 from sqlalchemy import String
 from sqlalchemy import testing
+from sqlalchemy.orm import aliased
+from sqlalchemy.orm import joinedload
+from sqlalchemy.orm import polymorphic_union
 from sqlalchemy.orm import relationship
+from sqlalchemy.orm import selectinload
+from sqlalchemy.orm import Session
+from sqlalchemy.testing import assert_raises
+from sqlalchemy.testing import AssertsCompiledSQL
 from sqlalchemy.testing import eq_
 from sqlalchemy.testing import fixtures
 from sqlalchemy.testing.assertions import expect_deprecated_20
@@ -14,8 +26,10 @@ from ._poly_fixtures import _PolymorphicAliasedJoins
 from ._poly_fixtures import _PolymorphicJoins
 from ._poly_fixtures import _PolymorphicPolymorphic
 from ._poly_fixtures import _PolymorphicUnions
+from ._poly_fixtures import Boss
 from ._poly_fixtures import Company
 from ._poly_fixtures import Engineer
+from ._poly_fixtures import Machine
 from ._poly_fixtures import Manager
 from ._poly_fixtures import Paperwork
 from ._poly_fixtures import Person
@@ -26,6 +40,11 @@ aliased_jp_dep = (
     r"to Query.join\(\) are deprecated"
 )
 
+with_polymorphic_dep = (
+    r"The Query.with_polymorphic\(\) method is considered legacy as of "
+    r"the 1.x series of SQLAlchemy and will be removed in 2.0"
+)
+
 
 class _PolymorphicTestBase(fixtures.NoCache):
     __backend__ = True
@@ -87,7 +106,7 @@ class _PolymorphicTestBase(fixtures.NoCache):
 
     def test_join_from_with_polymorphic_flag_aliased_one(self):
         sess = fixture_session()
-        with expect_deprecated_20(aliased_jp_dep):
+        with expect_deprecated_20(aliased_jp_dep, with_polymorphic_dep):
             eq_(
                 sess.query(Person)
                 .with_polymorphic(Manager)
@@ -99,7 +118,7 @@ class _PolymorphicTestBase(fixtures.NoCache):
 
     def test_join_from_with_polymorphic_flag_aliased_two(self):
         sess = fixture_session()
-        with expect_deprecated_20(aliased_jp_dep):
+        with expect_deprecated_20(aliased_jp_dep, with_polymorphic_dep):
             eq_(
                 sess.query(Person)
                 .with_polymorphic([Manager, Engineer])
@@ -148,6 +167,263 @@ class _PolymorphicTestBase(fixtures.NoCache):
                 [e1],
             )
 
+    def test_with_polymorphic_one(self):
+        sess = fixture_session()
+
+        def go():
+            with expect_deprecated_20(with_polymorphic_dep):
+                eq_(
+                    sess.query(Person)
+                    .with_polymorphic(Engineer)
+                    .filter(Engineer.primary_language == "java")
+                    .all(),
+                    self._emps_wo_relationships_fixture()[0:1],
+                )
+
+        self.assert_sql_count(testing.db, go, 1)
+
+    def test_with_polymorphic_two(self):
+        sess = fixture_session()
+
+        def go():
+            with expect_deprecated_20(with_polymorphic_dep):
+                eq_(
+                    sess.query(Person)
+                    .with_polymorphic("*")
+                    .order_by(Person.person_id)
+                    .all(),
+                    self._emps_wo_relationships_fixture(),
+                )
+
+        self.assert_sql_count(testing.db, go, 1)
+
+    def test_with_polymorphic_three(self):
+        sess = fixture_session()
+
+        def go():
+            with expect_deprecated_20(with_polymorphic_dep):
+                eq_(
+                    sess.query(Person)
+                    .with_polymorphic(Engineer)
+                    .order_by(Person.person_id)
+                    .all(),
+                    self._emps_wo_relationships_fixture(),
+                )
+
+        self.assert_sql_count(testing.db, go, 3)
+
+    def test_with_polymorphic_four(self):
+        sess = fixture_session()
+
+        def go():
+            with expect_deprecated_20(with_polymorphic_dep):
+                eq_(
+                    sess.query(Person)
+                    .with_polymorphic(Engineer, people.outerjoin(engineers))
+                    .order_by(Person.person_id)
+                    .all(),
+                    self._emps_wo_relationships_fixture(),
+                )
+
+        self.assert_sql_count(testing.db, go, 3)
+
+    def test_with_polymorphic_five(self):
+        sess = fixture_session()
+
+        def go():
+            # limit the polymorphic join down to just "Person",
+            # overriding select_table
+            with expect_deprecated_20(with_polymorphic_dep):
+                eq_(
+                    sess.query(Person).with_polymorphic(Person).all(),
+                    self._emps_wo_relationships_fixture(),
+                )
+
+        self.assert_sql_count(testing.db, go, 6)
+
+    def test_with_polymorphic_six(self):
+        sess = fixture_session()
+
+        with expect_deprecated_20(with_polymorphic_dep):
+            assert_raises(
+                sa_exc.InvalidRequestError,
+                sess.query(Person).with_polymorphic,
+                Paperwork,
+            )
+        with expect_deprecated_20(with_polymorphic_dep):
+            assert_raises(
+                sa_exc.InvalidRequestError,
+                sess.query(Engineer).with_polymorphic,
+                Boss,
+            )
+        with expect_deprecated_20(with_polymorphic_dep):
+            assert_raises(
+                sa_exc.InvalidRequestError,
+                sess.query(Engineer).with_polymorphic,
+                Person,
+            )
+
+    def test_with_polymorphic_seven(self):
+        sess = fixture_session()
+        # compare to entities without related collections to prevent
+        # additional lazy SQL from firing on loaded entities
+        with expect_deprecated_20(with_polymorphic_dep):
+            eq_(
+                sess.query(Person)
+                .with_polymorphic("*")
+                .order_by(Person.person_id)
+                .all(),
+                self._emps_wo_relationships_fixture(),
+            )
+
+    def test_joinedload_on_subclass(self):
+        sess = fixture_session()
+        expected = [
+            Engineer(
+                name="dilbert",
+                engineer_name="dilbert",
+                primary_language="java",
+                status="regular engineer",
+                machines=[
+                    Machine(name="IBM ThinkPad"),
+                    Machine(name="IPhone"),
+                ],
+            )
+        ]
+
+        def go():
+            # test load People with joinedload to engineers + machines
+            with expect_deprecated_20(with_polymorphic_dep):
+                eq_(
+                    sess.query(Person)
+                    .with_polymorphic("*")
+                    .options(joinedload(Engineer.machines))
+                    .filter(Person.name == "dilbert")
+                    .all(),
+                    expected,
+                )
+
+        self.assert_sql_count(testing.db, go, 1)
+
+    def test_primary_eager_aliasing_three(self):
+
+        # assert the JOINs don't over JOIN
+
+        sess = fixture_session()
+
+        def go():
+            with expect_deprecated_20(with_polymorphic_dep):
+                eq_(
+                    sess.query(Person)
+                    .with_polymorphic("*")
+                    .order_by(Person.person_id)
+                    .options(joinedload(Engineer.machines))[1:3],
+                    all_employees[1:3],
+                )
+
+        self.assert_sql_count(testing.db, go, 3)
+
+        with expect_deprecated_20(with_polymorphic_dep):
+            eq_(
+                sess.scalar(
+                    select(func.count("*")).select_from(
+                        sess.query(Person)
+                        .with_polymorphic("*")
+                        .options(joinedload(Engineer.machines))
+                        .order_by(Person.person_id)
+                        .limit(2)
+                        .offset(1)
+                        .subquery()
+                    )
+                ),
+                2,
+            )
+
+    def test_join_from_with_polymorphic_nonaliased_one(self):
+        sess = fixture_session()
+        with expect_deprecated_20(with_polymorphic_dep):
+            eq_(
+                sess.query(Person)
+                .with_polymorphic(Manager)
+                .order_by(Person.person_id)
+                .join(Person.paperwork)
+                .filter(Paperwork.description.like("%review%"))
+                .all(),
+                [b1, m1],
+            )
+
+    def test_join_from_with_polymorphic_nonaliased_two(self):
+        sess = fixture_session()
+        with expect_deprecated_20(with_polymorphic_dep):
+            eq_(
+                sess.query(Person)
+                .with_polymorphic([Manager, Engineer])
+                .order_by(Person.person_id)
+                .join(Person.paperwork)
+                .filter(Paperwork.description.like("%#2%"))
+                .all(),
+                [e1, m1],
+            )
+
+    def test_join_from_with_polymorphic_nonaliased_three(self):
+        sess = fixture_session()
+        with expect_deprecated_20(with_polymorphic_dep):
+            eq_(
+                sess.query(Person)
+                .with_polymorphic([Manager, Engineer])
+                .order_by(Person.person_id)
+                .join(Person.paperwork)
+                .filter(Person.name.like("%dog%"))
+                .filter(Paperwork.description.like("%#2%"))
+                .all(),
+                [m1],
+            )
+
+    def test_join_from_with_polymorphic_explicit_aliased_one(self):
+        sess = fixture_session()
+        pa = aliased(Paperwork)
+
+        with expect_deprecated_20(with_polymorphic_dep):
+            eq_(
+                sess.query(Person)
+                .with_polymorphic(Manager)
+                .join(pa, Person.paperwork)
+                .filter(pa.description.like("%review%"))
+                .all(),
+                [b1, m1],
+            )
+
+    def test_join_from_with_polymorphic_explicit_aliased_two(self):
+        sess = fixture_session()
+        pa = aliased(Paperwork)
+
+        with expect_deprecated_20(with_polymorphic_dep):
+            eq_(
+                sess.query(Person)
+                .with_polymorphic([Manager, Engineer])
+                .order_by(Person.person_id)
+                .join(pa, Person.paperwork)
+                .filter(pa.description.like("%#2%"))
+                .all(),
+                [e1, m1],
+            )
+
+    def test_join_from_with_polymorphic_aliased_three(self):
+        sess = fixture_session()
+        pa = aliased(Paperwork)
+
+        with expect_deprecated_20(with_polymorphic_dep):
+            eq_(
+                sess.query(Person)
+                .with_polymorphic([Manager, Engineer])
+                .order_by(Person.person_id)
+                .join(pa, Person.paperwork)
+                .filter(Person.name.like("%dog%"))
+                .filter(pa.description.like("%#2%"))
+                .all(),
+                [m1],
+            )
+
 
 class PolymorphicTest(_PolymorphicTestBase, _Polymorphic):
     pass
@@ -258,3 +534,635 @@ class RelationshipToSingleTest(
                 "companies.company_id = employees_1.company_id "
                 "AND employees_1.type IN ([POSTCOMPILE_type_1])",
             )
+
+
+class SingleOnJoinedTest(fixtures.MappedTest):
+    @classmethod
+    def define_tables(cls, metadata):
+        global persons_table, employees_table
+
+        persons_table = Table(
+            "persons",
+            metadata,
+            Column(
+                "person_id",
+                Integer,
+                primary_key=True,
+                test_needs_autoincrement=True,
+            ),
+            Column("name", String(50)),
+            Column("type", String(20), nullable=False),
+        )
+
+        employees_table = Table(
+            "employees",
+            metadata,
+            Column(
+                "person_id",
+                Integer,
+                ForeignKey("persons.person_id"),
+                primary_key=True,
+            ),
+            Column("employee_data", String(50)),
+            Column("manager_data", String(50)),
+        )
+
+    def test_single_on_joined(self):
+        class Person(fixtures.ComparableEntity):
+            pass
+
+        class Employee(Person):
+            pass
+
+        class Manager(Employee):
+            pass
+
+        self.mapper_registry.map_imperatively(
+            Person,
+            persons_table,
+            polymorphic_on=persons_table.c.type,
+            polymorphic_identity="person",
+        )
+        self.mapper_registry.map_imperatively(
+            Employee,
+            employees_table,
+            inherits=Person,
+            polymorphic_identity="engineer",
+        )
+        self.mapper_registry.map_imperatively(
+            Manager, inherits=Employee, polymorphic_identity="manager"
+        )
+
+        sess = fixture_session()
+        sess.add(Person(name="p1"))
+        sess.add(Employee(name="e1", employee_data="ed1"))
+        sess.add(Manager(name="m1", employee_data="ed2", manager_data="md1"))
+        sess.flush()
+        sess.expunge_all()
+
+        eq_(
+            sess.query(Person).order_by(Person.person_id).all(),
+            [
+                Person(name="p1"),
+                Employee(name="e1", employee_data="ed1"),
+                Manager(name="m1", employee_data="ed2", manager_data="md1"),
+            ],
+        )
+        sess.expunge_all()
+
+        eq_(
+            sess.query(Employee).order_by(Person.person_id).all(),
+            [
+                Employee(name="e1", employee_data="ed1"),
+                Manager(name="m1", employee_data="ed2", manager_data="md1"),
+            ],
+        )
+        sess.expunge_all()
+
+        eq_(
+            sess.query(Manager).order_by(Person.person_id).all(),
+            [Manager(name="m1", employee_data="ed2", manager_data="md1")],
+        )
+        sess.expunge_all()
+
+        def go():
+            with expect_deprecated_20(with_polymorphic_dep):
+                eq_(
+                    sess.query(Person)
+                    .with_polymorphic("*")
+                    .order_by(Person.person_id)
+                    .all(),
+                    [
+                        Person(name="p1"),
+                        Employee(name="e1", employee_data="ed1"),
+                        Manager(
+                            name="m1", employee_data="ed2", manager_data="md1"
+                        ),
+                    ],
+                )
+
+        self.assert_sql_count(testing.db, go, 1)
+
+
+class SingleFromPolySelectableTest(
+    fixtures.DeclarativeMappedTest, AssertsCompiledSQL
+):
+    __dialect__ = "default"
+
+    @classmethod
+    def setup_classes(cls, with_polymorphic=None, include_sub_defaults=False):
+        Base = cls.DeclarativeBasic
+
+        class Employee(Base):
+            __tablename__ = "employee"
+            id = Column(Integer, primary_key=True)
+            name = Column(String(50))
+            type = Column(String(50))
+
+            __mapper_args__ = {
+                "polymorphic_identity": "employee",
+                "polymorphic_on": type,
+            }
+
+        class Engineer(Employee):
+            __tablename__ = "engineer"
+            id = Column(Integer, ForeignKey("employee.id"), primary_key=True)
+            engineer_info = Column(String(50))
+            manager_id = Column(ForeignKey("manager.id"))
+            __mapper_args__ = {"polymorphic_identity": "engineer"}
+
+        class Manager(Employee):
+            __tablename__ = "manager"
+            id = Column(Integer, ForeignKey("employee.id"), primary_key=True)
+            manager_data = Column(String(50))
+            __mapper_args__ = {"polymorphic_identity": "manager"}
+
+        class Boss(Manager):
+            __mapper_args__ = {"polymorphic_identity": "boss"}
+
+    def _with_poly_fixture(self):
+        employee = self.classes.Employee.__table__
+        engineer = self.classes.Engineer.__table__
+        manager = self.classes.Manager.__table__
+
+        poly = (
+            select(
+                employee.c.id,
+                employee.c.type,
+                employee.c.name,
+                manager.c.manager_data,
+                null().label("engineer_info"),
+                null().label("manager_id"),
+            )
+            .select_from(employee.join(manager))
+            .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
+            .union_all(
+                select(
+                    employee.c.id,
+                    employee.c.type,
+                    employee.c.name,
+                    null().label("manager_data"),
+                    engineer.c.engineer_info,
+                    engineer.c.manager_id,
+                )
+                .select_from(employee.join(engineer))
+                .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
+            )
+            .alias()
+        )
+
+        return poly
+
+    def test_query_wpoly_single_inh_subclass(self):
+        Boss = self.classes.Boss
+
+        poly = self._with_poly_fixture()
+
+        s = fixture_session()
+
+        with expect_deprecated_20(with_polymorphic_dep):
+            q = s.query(Boss).with_polymorphic(Boss, poly)
+        self.assert_compile(
+            q,
+            "SELECT anon_1.employee_id AS anon_1_employee_id, "
+            "anon_1.employee_name AS anon_1_employee_name, "
+            "anon_1.employee_type AS anon_1_employee_type, "
+            "anon_1.manager_manager_data AS anon_1_manager_manager_data "
+            "FROM (SELECT employee.id AS employee_id, employee.type "
+            "AS employee_type, employee.name AS employee_name, "
+            "manager.manager_data AS manager_manager_data, "
+            "NULL AS engineer_info, NULL AS manager_id FROM employee "
+            "JOIN manager ON employee.id = manager.id "
+            "UNION ALL SELECT employee.id AS employee_id, "
+            "employee.type AS employee_type, employee.name AS employee_name, "
+            "NULL AS manager_data, "
+            "engineer.engineer_info AS engineer_engineer_info, "
+            "engineer.manager_id AS engineer_manager_id "
+            "FROM employee JOIN engineer ON employee.id = engineer.id) "
+            "AS anon_1 WHERE anon_1.employee_type IN ([POSTCOMPILE_type_1])",
+        )
+
+
+class SameNamedPropTwoPolymorphicSubClassesTest(fixtures.MappedTest):
+    """test pathing when two subclasses contain a different property
+    for the same name, and polymorphic loading is used.
+
+    #2614
+
+    """
+
+    run_setup_classes = "once"
+    run_setup_mappers = "once"
+    run_inserts = "once"
+    run_deletes = None
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            "a",
+            metadata,
+            Column(
+                "id", Integer, primary_key=True, test_needs_autoincrement=True
+            ),
+            Column("type", String(10)),
+        )
+        Table(
+            "b",
+            metadata,
+            Column("id", Integer, ForeignKey("a.id"), primary_key=True),
+        )
+        Table(
+            "btod",
+            metadata,
+            Column("bid", Integer, ForeignKey("b.id"), nullable=False),
+            Column("did", Integer, ForeignKey("d.id"), nullable=False),
+        )
+        Table(
+            "c",
+            metadata,
+            Column("id", Integer, ForeignKey("a.id"), primary_key=True),
+        )
+        Table(
+            "ctod",
+            metadata,
+            Column("cid", Integer, ForeignKey("c.id"), nullable=False),
+            Column("did", Integer, ForeignKey("d.id"), nullable=False),
+        )
+        Table(
+            "d",
+            metadata,
+            Column(
+                "id", Integer, primary_key=True, test_needs_autoincrement=True
+            ),
+        )
+
+    @classmethod
+    def setup_classes(cls):
+        class A(cls.Comparable):
+            pass
+
+        class B(A):
+            pass
+
+        class C(A):
+            pass
+
+        class D(cls.Comparable):
+            pass
+
+    @classmethod
+    def setup_mappers(cls):
+        A = cls.classes.A
+        B = cls.classes.B
+        C = cls.classes.C
+        D = cls.classes.D
+
+        cls.mapper_registry.map_imperatively(
+            A, cls.tables.a, polymorphic_on=cls.tables.a.c.type
+        )
+        cls.mapper_registry.map_imperatively(
+            B,
+            cls.tables.b,
+            inherits=A,
+            polymorphic_identity="b",
+            properties={"related": relationship(D, secondary=cls.tables.btod)},
+        )
+        cls.mapper_registry.map_imperatively(
+            C,
+            cls.tables.c,
+            inherits=A,
+            polymorphic_identity="c",
+            properties={"related": relationship(D, secondary=cls.tables.ctod)},
+        )
+        cls.mapper_registry.map_imperatively(D, cls.tables.d)
+
+    @classmethod
+    def insert_data(cls, connection):
+        B = cls.classes.B
+        C = cls.classes.C
+        D = cls.classes.D
+
+        session = Session(connection)
+
+        d = D()
+        session.add_all([B(related=[d]), C(related=[d])])
+        session.commit()
+
+    def test_fixed_w_poly_subquery(self):
+        A = self.classes.A
+        B = self.classes.B
+        C = self.classes.C
+        D = self.classes.D
+
+        session = fixture_session()
+        d = session.query(D).one()
+
+        def go():
+            # NOTE: subqueryload is broken for this case, first found
+            # when cartesian product detection was added.
+            with expect_deprecated_20(with_polymorphic_dep):
+                for a in (
+                    session.query(A)
+                    .with_polymorphic([B, C])
+                    .options(selectinload(B.related), selectinload(C.related))
+                ):
+                    eq_(a.related, [d])
+
+        self.assert_sql_count(testing.db, go, 3)
+
+    def test_fixed_w_poly_joined(self):
+        A = self.classes.A
+        B = self.classes.B
+        C = self.classes.C
+        D = self.classes.D
+
+        session = fixture_session()
+        d = session.query(D).one()
+
+        def go():
+            with expect_deprecated_20(with_polymorphic_dep):
+                for a in (
+                    session.query(A)
+                    .with_polymorphic([B, C])
+                    .options(joinedload(B.related), joinedload(C.related))
+                ):
+                    eq_(a.related, [d])
+
+        self.assert_sql_count(testing.db, go, 1)
+
+
+class ConcreteTest(fixtures.MappedTest):
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            "companies",
+            metadata,
+            Column(
+                "id", Integer, primary_key=True, test_needs_autoincrement=True
+            ),
+            Column("name", String(50)),
+        )
+        Table(
+            "employees",
+            metadata,
+            Column(
+                "employee_id",
+                Integer,
+                primary_key=True,
+                test_needs_autoincrement=True,
+            ),
+            Column("name", String(50)),
+            Column("company_id", Integer, ForeignKey("companies.id")),
+        )
+        Table(
+            "managers",
+            metadata,
+            Column(
+                "employee_id",
+                Integer,
+                primary_key=True,
+                test_needs_autoincrement=True,
+            ),
+            Column("name", String(50)),
+            Column("manager_data", String(50)),
+            Column("company_id", Integer, ForeignKey("companies.id")),
+        )
+        Table(
+            "engineers",
+            metadata,
+            Column(
+                "employee_id",
+                Integer,
+                primary_key=True,
+                test_needs_autoincrement=True,
+            ),
+            Column("name", String(50)),
+            Column("engineer_info", String(50)),
+            Column("company_id", Integer, ForeignKey("companies.id")),
+        )
+        Table(
+            "hackers",
+            metadata,
+            Column(
+                "employee_id",
+                Integer,
+                primary_key=True,
+                test_needs_autoincrement=True,
+            ),
+            Column("name", String(50)),
+            Column("engineer_info", String(50)),
+            Column("company_id", Integer, ForeignKey("companies.id")),
+            Column("nickname", String(50)),
+        )
+
+    @classmethod
+    def setup_classes(cls):
+        class Employee(cls.Basic):
+            def __init__(self, name):
+                self.name = name
+
+            def __repr__(self):
+                return self.__class__.__name__ + " " + self.name
+
+        class Manager(Employee):
+            def __init__(self, name, manager_data):
+                self.name = name
+                self.manager_data = manager_data
+
+            def __repr__(self):
+                return (
+                    self.__class__.__name__
+                    + " "
+                    + self.name
+                    + " "
+                    + self.manager_data
+                )
+
+        class Engineer(Employee):
+            def __init__(self, name, engineer_info):
+                self.name = name
+                self.engineer_info = engineer_info
+
+            def __repr__(self):
+                return (
+                    self.__class__.__name__
+                    + " "
+                    + self.name
+                    + " "
+                    + self.engineer_info
+                )
+
+        class Hacker(Engineer):
+            def __init__(self, name, nickname, engineer_info):
+                self.name = name
+                self.nickname = nickname
+                self.engineer_info = engineer_info
+
+            def __repr__(self):
+                return (
+                    self.__class__.__name__
+                    + " "
+                    + self.name
+                    + " '"
+                    + self.nickname
+                    + "' "
+                    + self.engineer_info
+                )
+
+        class Company(cls.Basic):
+            pass
+
+    def test_without_default_polymorphic(self):
+        Employee, Engineer, Manager = self.classes(
+            "Employee", "Engineer", "Manager"
+        )
+        (Hacker,) = self.classes("Hacker")
+        (employees_table,) = self.tables("employees")
+        engineers_table, managers_table = self.tables("engineers", "managers")
+        (hackers_table,) = self.tables("hackers")
+
+        pjoin = polymorphic_union(
+            {
+                "employee": employees_table,
+                "manager": managers_table,
+                "engineer": engineers_table,
+                "hacker": hackers_table,
+            },
+            "type",
+            "pjoin",
+        )
+        pjoin2 = polymorphic_union(
+            {"engineer": engineers_table, "hacker": hackers_table},
+            "type",
+            "pjoin2",
+        )
+        employee_mapper = self.mapper_registry.map_imperatively(
+            Employee, employees_table, polymorphic_identity="employee"
+        )
+        self.mapper_registry.map_imperatively(
+            Manager,
+            managers_table,
+            inherits=employee_mapper,
+            concrete=True,
+            polymorphic_identity="manager",
+        )
+        engineer_mapper = self.mapper_registry.map_imperatively(
+            Engineer,
+            engineers_table,
+            inherits=employee_mapper,
+            concrete=True,
+            polymorphic_identity="engineer",
+        )
+        self.mapper_registry.map_imperatively(
+            Hacker,
+            hackers_table,
+            inherits=engineer_mapper,
+            concrete=True,
+            polymorphic_identity="hacker",
+        )
+        session = fixture_session()
+        jdoe = Employee("Jdoe")
+        tom = Manager("Tom", "knows how to manage things")
+        jerry = Engineer("Jerry", "knows how to program")
+        hacker = Hacker("Kurt", "Badass", "knows how to hack")
+        session.add_all((jdoe, tom, jerry, hacker))
+        session.flush()
+
+        with expect_deprecated_20(with_polymorphic_dep):
+            eq_(
+                len(
+                    session.connection()
+                    .execute(
+                        session.query(Employee)
+                        .with_polymorphic("*", pjoin, pjoin.c.type)
+                        .statement
+                    )
+                    .fetchall()
+                ),
+                4,
+            )
+        eq_(session.get(Employee, jdoe.employee_id), jdoe)
+        eq_(session.get(Engineer, jerry.employee_id), jerry)
+        with expect_deprecated_20(with_polymorphic_dep):
+            eq_(
+                set(
+                    [
+                        repr(x)
+                        for x in session.query(Employee).with_polymorphic(
+                            "*", pjoin, pjoin.c.type
+                        )
+                    ]
+                ),
+                set(
+                    [
+                        "Employee Jdoe",
+                        "Engineer Jerry knows how to program",
+                        "Manager Tom knows how to manage things",
+                        "Hacker Kurt 'Badass' knows how to hack",
+                    ]
+                ),
+            )
+        eq_(
+            set([repr(x) for x in session.query(Manager)]),
+            set(["Manager Tom knows how to manage things"]),
+        )
+        with expect_deprecated_20(with_polymorphic_dep):
+            eq_(
+                set(
+                    [
+                        repr(x)
+                        for x in session.query(Engineer).with_polymorphic(
+                            "*", pjoin2, pjoin2.c.type
+                        )
+                    ]
+                ),
+                set(
+                    [
+                        "Engineer Jerry knows how to program",
+                        "Hacker Kurt 'Badass' knows how to hack",
+                    ]
+                ),
+            )
+        eq_(
+            set([repr(x) for x in session.query(Hacker)]),
+            set(["Hacker Kurt 'Badass' knows how to hack"]),
+        )
+
+        # test adaption of the column by wrapping the query in a
+        # subquery
+
+        with testing.expect_deprecated(
+            r"The Query.from_self\(\) method", with_polymorphic_dep
+        ):
+            eq_(
+                len(
+                    session.connection()
+                    .execute(
+                        session.query(Engineer)
+                        .with_polymorphic("*", pjoin2, pjoin2.c.type)
+                        .from_self()
+                        .statement
+                    )
+                    .fetchall()
+                ),
+                2,
+            )
+        with testing.expect_deprecated(
+            r"The Query.from_self\(\) method", with_polymorphic_dep
+        ):
+            eq_(
+                set(
+                    [
+                        repr(x)
+                        for x in session.query(Engineer)
+                        .with_polymorphic("*", pjoin2, pjoin2.c.type)
+                        .from_self()
+                    ]
+                ),
+                set(
+                    [
+                        "Engineer Jerry knows how to program",
+                        "Hacker Kurt 'Badass' knows how to hack",
+                    ]
+                ),
+            )
index 2e52e162bbb7826fad690221a2a851ad79c103e6..60235bd86cd28d53f333449903bf9c6976723d23 100644 (file)
@@ -132,18 +132,25 @@ class _PolymorphicTestBase(fixtures.NoCache):
         count = {"": 14, "Polymorphic": 7}.get(self.select_type, 8)
         self.assert_sql_count(testing.db, go, count)
 
-    def test_primary_eager_aliasing_three(self):
+    def test_primary_eager_aliasing_three_reset_selectable(self):
+        """test now related to #7262
 
+        See test_primary_eager_aliasing_three_dont_reset_selectable for the
+        non-reset selectable version.
+
+        """
         # assert the JOINs don't over JOIN
 
         sess = fixture_session()
 
+        # note selectable=None
+        wp = with_polymorphic(Person, "*", None)
+
         def go():
             eq_(
-                sess.query(Person)
-                .with_polymorphic("*")
-                .order_by(Person.person_id)
-                .options(joinedload(Engineer.machines))[1:3],
+                sess.query(wp)
+                .order_by(wp.person_id)
+                .options(joinedload(wp.Engineer.machines))[1:3],
                 all_employees[1:3],
             )
 
@@ -152,10 +159,9 @@ class _PolymorphicTestBase(fixtures.NoCache):
         eq_(
             sess.scalar(
                 select(func.count("*")).select_from(
-                    sess.query(Person)
-                    .with_polymorphic("*")
-                    .options(joinedload(Engineer.machines))
-                    .order_by(Person.person_id)
+                    sess.query(wp)
+                    .options(joinedload(wp.Engineer.machines))
+                    .order_by(wp.person_id)
                     .limit(2)
                     .offset(1)
                     .subquery()
@@ -452,18 +458,6 @@ class _PolymorphicTestBase(fixtures.NoCache):
             [m1],
         )
 
-    def test_join_from_with_polymorphic_nonaliased_one(self):
-        sess = fixture_session()
-        eq_(
-            sess.query(Person)
-            .with_polymorphic(Manager)
-            .order_by(Person.person_id)
-            .join(Person.paperwork)
-            .filter(Paperwork.description.like("%review%"))
-            .all(),
-            [b1, m1],
-        )
-
     def test_join_from_with_polymorphic_nonaliased_one_future(self):
         sess = fixture_session(future=True)
 
@@ -481,66 +475,99 @@ class _PolymorphicTestBase(fixtures.NoCache):
             [b1, m1],
         )
 
-    def test_join_from_with_polymorphic_nonaliased_two(self):
+    def test_join_from_with_polymorphic_nonaliased_two_future(self):
         sess = fixture_session()
+
+        wp = with_polymorphic(Person, [Manager, Engineer])
         eq_(
-            sess.query(Person)
-            .with_polymorphic([Manager, Engineer])
-            .order_by(Person.person_id)
-            .join(Person.paperwork)
+            sess.query(wp)
+            .order_by(wp.person_id)
+            .join(wp.paperwork)
             .filter(Paperwork.description.like("%#2%"))
             .all(),
             [e1, m1],
         )
 
-    def test_join_from_with_polymorphic_nonaliased_three(self):
+    def test_join_from_with_polymorphic_nonaliased_three_future(self):
         sess = fixture_session()
+
+        wp = with_polymorphic(Person, [Manager, Engineer])
         eq_(
-            sess.query(Person)
-            .with_polymorphic([Manager, Engineer])
-            .order_by(Person.person_id)
-            .join(Person.paperwork)
-            .filter(Person.name.like("%dog%"))
+            sess.query(wp)
+            .order_by(wp.person_id)
+            .join(wp.paperwork)
+            .filter(wp.name.like("%dog%"))
             .filter(Paperwork.description.like("%#2%"))
             .all(),
             [m1],
         )
 
-    def test_join_from_with_polymorphic_explicit_aliased_one(self):
+    def test_join_from_with_polymorphic_explicit_aliased_one_future(self):
         sess = fixture_session()
         pa = aliased(Paperwork)
+        wp = with_polymorphic(Person, [Manager])
+
         eq_(
-            sess.query(Person)
-            .with_polymorphic(Manager)
-            .join(pa, Person.paperwork)
+            sess.query(wp)
+            .join(pa, wp.paperwork)
             .filter(pa.description.like("%review%"))
             .all(),
             [b1, m1],
         )
 
-    def test_join_from_with_polymorphic_explicit_aliased_two(self):
+    def test_join_from_with_polymorphic_explicit_aliased_two_future(self):
         sess = fixture_session()
         pa = aliased(Paperwork)
+
+        wp = with_polymorphic(Person, [Manager, Engineer])
         eq_(
-            sess.query(Person)
-            .with_polymorphic([Manager, Engineer])
-            .order_by(Person.person_id)
-            .join(pa, Person.paperwork)
+            sess.query(wp)
+            .order_by(wp.person_id)
+            .join(pa, wp.paperwork)
             .filter(pa.description.like("%#2%"))
             .all(),
             [e1, m1],
         )
 
-    def test_join_from_with_polymorphic_aliased_three(self):
+    def test_join_from_with_polymorphic_ot_explicit_aliased_two_future(self):
         sess = fixture_session()
         pa = aliased(Paperwork)
 
+        wp = with_polymorphic(Person, [Manager, Engineer])
         eq_(
-            sess.query(Person)
-            .with_polymorphic([Manager, Engineer])
-            .order_by(Person.person_id)
-            .join(pa, Person.paperwork)
-            .filter(Person.name.like("%dog%"))
+            sess.query(wp)
+            .order_by(wp.person_id)
+            .join(wp.paperwork.of_type(pa))
+            .filter(pa.description.like("%#2%"))
+            .all(),
+            [e1, m1],
+        )
+
+    def test_join_from_with_polymorphic_aliased_three_future(self):
+        sess = fixture_session()
+        pa = aliased(Paperwork)
+        wp = with_polymorphic(Person, [Manager, Engineer])
+
+        eq_(
+            sess.query(wp)
+            .order_by(wp.person_id)
+            .join(pa, wp.paperwork)
+            .filter(wp.name.like("%dog%"))
+            .filter(pa.description.like("%#2%"))
+            .all(),
+            [m1],
+        )
+
+    def test_join_from_with_polymorphic_ot_aliased_three_future(self):
+        sess = fixture_session()
+        pa = aliased(Paperwork)
+        wp = with_polymorphic(Person, [Manager, Engineer])
+
+        eq_(
+            sess.query(wp)
+            .order_by(wp.person_id)
+            .join(wp.paperwork.of_type(pa))
+            .filter(wp.name.like("%dog%"))
             .filter(pa.description.like("%#2%"))
             .all(),
             [m1],
@@ -820,103 +847,109 @@ class _PolymorphicTestBase(fixtures.NoCache):
         sess.expire(m2, ["manager_name", "golf_swing"])
         assert m2.golf_swing == "fore"
 
-    def test_with_polymorphic_one(self):
+    def test_with_polymorphic_one_future(self):
         sess = fixture_session()
 
         def go():
+            wp = with_polymorphic(Person, [Engineer])
             eq_(
-                sess.query(Person)
-                .with_polymorphic(Engineer)
-                .filter(Engineer.primary_language == "java")
+                sess.query(wp)
+                .filter(wp.Engineer.primary_language == "java")
                 .all(),
                 self._emps_wo_relationships_fixture()[0:1],
             )
 
         self.assert_sql_count(testing.db, go, 1)
 
-    def test_with_polymorphic_two(self):
+    def test_with_polymorphic_two_future_adhoc_wp(self):
+        """test #7262
+
+        compare to
+        test_with_polymorphic_two_future_default_wp
+
+        """
+
         sess = fixture_session()
 
         def go():
+
+            wp = with_polymorphic(Person, "*", selectable=None)
             eq_(
-                sess.query(Person)
-                .with_polymorphic("*")
-                .order_by(Person.person_id)
-                .all(),
+                sess.query(wp).order_by(wp.person_id).all(),
                 self._emps_wo_relationships_fixture(),
             )
 
         self.assert_sql_count(testing.db, go, 1)
 
-    def test_with_polymorphic_three(self):
+    def test_with_polymorphic_three_future(self):
         sess = fixture_session()
 
         def go():
+            wp = with_polymorphic(Person, [Engineer])
+
             eq_(
-                sess.query(Person)
-                .with_polymorphic(Engineer)
-                .order_by(Person.person_id)
-                .all(),
+                sess.query(wp).order_by(wp.person_id).all(),
                 self._emps_wo_relationships_fixture(),
             )
 
         self.assert_sql_count(testing.db, go, 3)
 
-    def test_with_polymorphic_four(self):
+    def test_with_polymorphic_four_future(self):
         sess = fixture_session()
 
         def go():
+            wp = with_polymorphic(
+                Person, Engineer, selectable=people.outerjoin(engineers)
+            )
             eq_(
-                sess.query(Person)
-                .with_polymorphic(Engineer, people.outerjoin(engineers))
-                .order_by(Person.person_id)
-                .all(),
+                sess.query(wp).order_by(wp.person_id).all(),
                 self._emps_wo_relationships_fixture(),
             )
 
         self.assert_sql_count(testing.db, go, 3)
 
-    def test_with_polymorphic_five(self):
+    def test_with_polymorphic_five_future_override_selectable(self):
+        """test part of #7262
+
+        this is kind of a hack though, people wouldn't know to do this
+        this way.
+
+        """
         sess = fixture_session()
 
         def go():
+            # needs both [Person] and the selectable=None part
+            # TODO: why do we need [Person] and can't send []? possible
+            # bug
+            wp = with_polymorphic(Person, [Person], selectable=None)
+
             # limit the polymorphic join down to just "Person",
             # overriding select_table
             eq_(
-                sess.query(Person).with_polymorphic(Person).all(),
+                sess.query(wp).all(),
                 self._emps_wo_relationships_fixture(),
             )
 
         self.assert_sql_count(testing.db, go, 6)
 
-    def test_with_polymorphic_six(self):
-        sess = fixture_session()
-
+    def test_with_polymorphic_six_future(self):
         assert_raises(
-            sa_exc.InvalidRequestError,
-            sess.query(Person).with_polymorphic,
-            Paperwork,
+            sa_exc.InvalidRequestError, with_polymorphic, Person, [Paperwork]
         )
         assert_raises(
-            sa_exc.InvalidRequestError,
-            sess.query(Engineer).with_polymorphic,
-            Boss,
+            sa_exc.InvalidRequestError, with_polymorphic, Engineer, [Boss]
         )
         assert_raises(
-            sa_exc.InvalidRequestError,
-            sess.query(Engineer).with_polymorphic,
-            Person,
+            sa_exc.InvalidRequestError, with_polymorphic, Engineer, [Person]
         )
 
-    def test_with_polymorphic_seven(self):
+    def test_with_polymorphic_seven_future(self):
         sess = fixture_session()
         # compare to entities without related collections to prevent
         # additional lazy SQL from firing on loaded entities
+        wp = with_polymorphic(Person, "*")
         eq_(
-            sess.query(Person)
-            .with_polymorphic("*")
-            .order_by(Person.person_id)
-            .all(),
+            sess.query(wp).order_by(wp.person_id).all(),
             self._emps_wo_relationships_fixture(),
         )
 
@@ -1009,11 +1042,11 @@ class _PolymorphicTestBase(fixtures.NoCache):
 
         def go():
             # test load People with joinedload to engineers + machines
+            wp = with_polymorphic(Person, "*")
             eq_(
-                sess.query(Person)
-                .with_polymorphic("*")
-                .options(joinedload(Engineer.machines))
-                .filter(Person.name == "dilbert")
+                sess.query(wp)
+                .options(joinedload(wp.Engineer.machines))
+                .filter(wp.name == "dilbert")
                 .all(),
                 expected,
             )
@@ -1045,19 +1078,6 @@ class _PolymorphicTestBase(fixtures.NoCache):
                 expected,
             )
 
-            # the old version of this test has never worked, apparently,
-            # was always spitting out a cartesian product.  Since we
-            # are getting rid of query.with_polymorphic() is it not
-            # worth fixing.
-            # eq_(
-            #    sess.query(Person)
-            #    .with_polymorphic("*")
-            #    .options(subqueryload(Engineer.machines))
-            #    .filter(Person.name == "dilbert")
-            #    .all(),
-            #    expected,
-            # )
-
         self.assert_sql_count(testing.db, go, 2)
 
     def test_query_subclass_join_to_base_relationship(self):
@@ -2014,6 +2034,63 @@ class _PolymorphicTestBase(fixtures.NoCache):
 
 
 class PolymorphicTest(_PolymorphicTestBase, _Polymorphic):
+    def test_primary_eager_aliasing_three_dont_reset_selectable(self):
+        """test now related to #7262
+
+        See test_primary_eager_aliasing_three_reset_selectable for
+        the reset selectable version.
+
+        """
+        # assert the JOINs don't over JOIN
+
+        sess = fixture_session()
+
+        # selectable default is False
+        wp = with_polymorphic(Person, "*")
+
+        def go():
+            eq_(
+                sess.query(wp)
+                .order_by(wp.person_id)
+                .options(joinedload(wp.Engineer.machines))[1:3],
+                all_employees[1:3],
+            )
+
+        self.assert_sql_count(testing.db, go, 3)
+
+        eq_(
+            sess.scalar(
+                select(func.count("*")).select_from(
+                    sess.query(wp)
+                    .options(joinedload(wp.Engineer.machines))
+                    .order_by(wp.person_id)
+                    .limit(2)
+                    .offset(1)
+                    .subquery()
+                )
+            ),
+            2,
+        )
+
+    def test_with_polymorphic_two_future_default_wp(self):
+        """test #7262
+
+        compare to
+        test_with_polymorphic_two_future_adhoc_wp
+
+        """
+        sess = fixture_session()
+
+        def go():
+
+            wp = with_polymorphic(Person, "*")
+            eq_(
+                sess.query(wp).order_by(wp.person_id).all(),
+                self._emps_wo_relationships_fixture(),
+            )
+
+        self.assert_sql_count(testing.db, go, 1)
+
     def test_join_to_subclass_four(self):
         sess = fixture_session()
         eq_(
@@ -2094,6 +2171,25 @@ class PolymorphicPolymorphicTest(
 ):
     __dialect__ = "default"
 
+    def test_with_polymorphic_two_future_default_wp(self):
+        """test #7262
+
+        compare to
+        test_with_polymorphic_two_future_adhoc_wp
+
+        """
+        sess = fixture_session()
+
+        def go():
+
+            wp = with_polymorphic(Person, "*")
+            eq_(
+                sess.query(wp).order_by(wp.person_id).all(),
+                self._emps_wo_relationships_fixture(),
+            )
+
+        self.assert_sql_count(testing.db, go, 1)
+
     def test_aliased_not_polluted_by_join(self):
         # aliased(polymorphic) will normally do the old-school
         # "(SELECT * FROM a JOIN b ...) AS anon_1" thing.
@@ -2188,6 +2284,25 @@ class PolymorphicPolymorphicTest(
 
 
 class PolymorphicUnionsTest(_PolymorphicTestBase, _PolymorphicUnions):
+    def test_with_polymorphic_two_future_default_wp(self):
+        """test #7262
+
+        compare to
+        test_with_polymorphic_two_future_adhoc_wp
+
+        """
+        sess = fixture_session()
+
+        def go():
+
+            wp = with_polymorphic(Person, "*")
+            eq_(
+                sess.query(wp).order_by(wp.person_id).all(),
+                self._emps_wo_relationships_fixture(),
+            )
+
+        self.assert_sql_count(testing.db, go, 2)
+
     def test_subqueryload_on_subclass_uses_path_correctly(self):
         sess = fixture_session()
         expected = [
@@ -2270,10 +2385,46 @@ class PolymorphicUnionsTest(_PolymorphicTestBase, _PolymorphicUnions):
 class PolymorphicAliasedJoinsTest(
     _PolymorphicTestBase, _PolymorphicAliasedJoins
 ):
-    pass
+    def test_with_polymorphic_two_future_default_wp(self):
+        """test #7262
+
+        compare to
+        test_with_polymorphic_two_future_adhoc_wp
+
+        """
+        sess = fixture_session()
+
+        def go():
+
+            wp = with_polymorphic(Person, "*")
+            eq_(
+                sess.query(wp).order_by(wp.person_id).all(),
+                self._emps_wo_relationships_fixture(),
+            )
+
+        self.assert_sql_count(testing.db, go, 2)
 
 
 class PolymorphicJoinsTest(_PolymorphicTestBase, _PolymorphicJoins):
+    def test_with_polymorphic_two_future_default_wp(self):
+        """test #7262
+
+        compare to
+        test_with_polymorphic_two_future_adhoc_wp
+
+        """
+        sess = fixture_session()
+
+        def go():
+
+            wp = with_polymorphic(Person, "*")
+            eq_(
+                sess.query(wp).order_by(wp.person_id).all(),
+                self._emps_wo_relationships_fixture(),
+            )
+
+        self.assert_sql_count(testing.db, go, 2)
+
     def test_having_group_by(self):
         sess = fixture_session()
         eq_(
index 881353e3bb7d4b169bbff8ef95f81629b36aec06..eeb3a7ed6366a107c16753d4519494974efbe8b1 100644 (file)
@@ -11,7 +11,6 @@ from sqlalchemy.orm import configure_mappers
 from sqlalchemy.orm import contains_eager
 from sqlalchemy.orm import joinedload
 from sqlalchemy.orm import relationship
-from sqlalchemy.orm import selectinload
 from sqlalchemy.orm import Session
 from sqlalchemy.orm import sessionmaker
 from sqlalchemy.orm import subqueryload
@@ -1471,27 +1470,6 @@ class SameNamedPropTwoPolymorphicSubClassesTest(fixtures.MappedTest):
 
         self.assert_sql_count(testing.db, go, 3)
 
-    def test_fixed_w_poly_subquery(self):
-        A = self.classes.A
-        B = self.classes.B
-        C = self.classes.C
-        D = self.classes.D
-
-        session = fixture_session()
-        d = session.query(D).one()
-
-        def go():
-            # NOTE: subqueryload is broken for this case, first found
-            # when cartesian product detection was added.
-            for a in (
-                session.query(A)
-                .with_polymorphic([B, C])
-                .options(selectinload(B.related), selectinload(C.related))
-            ):
-                eq_(a.related, [d])
-
-        self.assert_sql_count(testing.db, go, 3)
-
     def test_free_w_poly_joined(self):
         A = self.classes.A
         B = self.classes.B
@@ -1510,25 +1488,6 @@ class SameNamedPropTwoPolymorphicSubClassesTest(fixtures.MappedTest):
 
         self.assert_sql_count(testing.db, go, 1)
 
-    def test_fixed_w_poly_joined(self):
-        A = self.classes.A
-        B = self.classes.B
-        C = self.classes.C
-        D = self.classes.D
-
-        session = fixture_session()
-        d = session.query(D).one()
-
-        def go():
-            for a in (
-                session.query(A)
-                .with_polymorphic([B, C])
-                .options(joinedload(B.related), joinedload(C.related))
-            ):
-                eq_(a.related, [d])
-
-        self.assert_sql_count(testing.db, go, 1)
-
 
 class SubClassToSubClassFromParentTest(fixtures.MappedTest):
     """test #2617"""
index 025f717d7d81c6008413dae55a0ac211be4c9a41..30d4549c41bd2cf2a2d3d23d33c931c40a71c172 100644 (file)
@@ -1666,11 +1666,9 @@ class SingleOnJoinedTest(fixtures.MappedTest):
         sess.expunge_all()
 
         def go():
+            wp = with_polymorphic(Person, "*")
             eq_(
-                sess.query(Person)
-                .with_polymorphic("*")
-                .order_by(Person.person_id)
-                .all(),
+                sess.query(wp).order_by(wp.person_id).all(),
                 [
                     Person(name="p1"),
                     Employee(name="e1", employee_data="ed1"),
@@ -1791,7 +1789,9 @@ class SingleFromPolySelectableTest(
         poly = self._with_poly_fixture()
 
         s = fixture_session()
-        q = s.query(Boss).with_polymorphic(Boss, poly)
+
+        wp = with_polymorphic(Boss, [], poly)
+        q = s.query(wp)
         self.assert_compile(
             q,
             "SELECT anon_1.employee_id AS anon_1_employee_id, "
index d689bd6bad126acf95543680c02c8bcc9f9ca21d..7fb232b0b8761b53e71e3816581ff6c8a4372992 100644 (file)
@@ -581,13 +581,6 @@ class PolyCacheKeyTest(CacheKeyFixture, _poly_fixtures._Polymorphic):
             "Person", "Manager", "Engineer", "Boss"
         )
 
-        def one():
-            return (
-                fixture_session()
-                .query(Person)
-                .with_polymorphic([Manager, Engineer])
-            )
-
         def two():
             wp = with_polymorphic(Person, [Manager, Engineer])
 
@@ -603,14 +596,6 @@ class PolyCacheKeyTest(CacheKeyFixture, _poly_fixtures._Polymorphic):
 
             return fixture_session().query(wp).filter(wp.name == "asdfo")
 
-        def four():
-            return (
-                fixture_session()
-                .query(Person)
-                .with_polymorphic([Manager, Engineer])
-                .filter(Person.name == "asdf")
-            )
-
         def five():
             subq = (
                 select(Person)
@@ -622,25 +607,8 @@ class PolyCacheKeyTest(CacheKeyFixture, _poly_fixtures._Polymorphic):
 
             return fixture_session().query(wp).filter(wp.name == "asdfo")
 
-        def six():
-            subq = (
-                select(Person)
-                .outerjoin(Manager)
-                .outerjoin(Engineer)
-                .subquery()
-            )
-
-            return (
-                fixture_session()
-                .query(Person)
-                .with_polymorphic([Manager, Engineer], subq)
-                .filter(Person.name == "asdfo")
-            )
-
         self._run_cache_key_fixture(
-            lambda: stmt_20(
-                one(), two(), three(), three_a(), four(), five(), six()
-            ),
+            lambda: stmt_20(two(), three(), three_a(), five()),
             compare_values=True,
         )
 
index 5abce44983956c3786c1623600c5d29643a2f990..e670427ad11a9537aec1fc011d9442b4ed9f2bbd 100644 (file)
@@ -23,6 +23,7 @@ from sqlalchemy import text
 from sqlalchemy import true
 from sqlalchemy import util
 from sqlalchemy.engine import default
+from sqlalchemy.engine.base import Engine
 from sqlalchemy.orm import aliased
 from sqlalchemy.orm import as_declarative
 from sqlalchemy.orm import attributes
@@ -125,6 +126,15 @@ join_tuple_form = (
     "arguments in SQLAlchemy 2.0."
 )
 
+autocommit_dep = (
+    "The Session.autocommit parameter is deprecated "
+    "and will be removed in SQLAlchemy version 2.0."
+)
+
+subtransactions_dep = (
+    "The Session.begin.subtransactions flag is deprecated "
+    "and will be removed in SQLAlchemy version 2.0."
+)
 opt_strings_dep = (
     "Using strings to indicate column or relationship "
     "paths in loader options"
@@ -146,6 +156,11 @@ sef_dep = (
     "legacy as of the 1.x"
 )
 
+with_polymorphic_dep = (
+    r"The Query.with_polymorphic\(\) method is considered legacy as of "
+    r"the 1.x series of SQLAlchemy and will be removed in 2.0"
+)
+
 
 def _aliased_join_warning(arg=None):
     return testing.expect_warnings(
@@ -2772,19 +2787,13 @@ class SessionTest(fixtures.RemovesEvents, _LocalFixture):
         s1 = Session(testing.db)
         s1.begin()
 
-        with testing.expect_deprecated_20(
-            "The Session.begin.subtransactions flag is deprecated "
-            "and will be removed in SQLAlchemy version 2.0."
-        ):
+        with testing.expect_deprecated_20(subtransactions_dep):
             s1.begin(subtransactions=True)
 
         s1.close()
 
     def test_autocommit_deprecated(Self):
-        with testing.expect_deprecated_20(
-            "The Session.autocommit parameter is deprecated "
-            "and will be removed in SQLAlchemy version 2.0."
-        ):
+        with testing.expect_deprecated_20(autocommit_dep):
             Session(autocommit=True)
 
     @testing.combinations(
@@ -2922,6 +2931,124 @@ class SessionTest(fixtures.RemovesEvents, _LocalFixture):
         eq_(sess.query(User).count(), 1)
 
 
+class TransScopingTest(_fixtures.FixtureTest):
+    run_inserts = None
+    __prefer_requires__ = ("independent_connections",)
+
+    @testing.combinations((True,), (False,), argnames="begin")
+    @testing.combinations((True,), (False,), argnames="expire_on_commit")
+    @testing.combinations((True,), (False,), argnames="modify_unconditional")
+    @testing.combinations(
+        ("nothing",), ("modify",), ("add",), ("delete",), argnames="case_"
+    )
+    def test_autobegin_attr_change(
+        self, case_, begin, modify_unconditional, expire_on_commit
+    ):
+        """test :ticket:`6360`"""
+
+        autocommit = True
+        User, users = self.classes.User, self.tables.users
+
+        self.mapper_registry.map_imperatively(User, users)
+
+        with testing.expect_deprecated_20(autocommit_dep):
+            s = Session(
+                testing.db,
+                autocommit=autocommit,
+                expire_on_commit=expire_on_commit,
+            )
+
+        u = User(name="x")
+        u2 = User(name="d")
+        u3 = User(name="e")
+        s.add_all([u, u2, u3])
+
+        if autocommit:
+            s.flush()
+        else:
+            s.commit()
+
+        if begin:
+            s.begin()
+
+        if case_ == "add":
+            # this autobegins
+            s.add(User(name="q"))
+        elif case_ == "delete":
+            # this autobegins
+            s.delete(u2)
+        elif case_ == "modify":
+            # this autobegins
+            u3.name = "m"
+
+        if case_ == "nothing" and not begin:
+            assert not s._transaction
+            expect_expire = expire_on_commit
+        elif autocommit and not begin:
+            assert not s._transaction
+            expect_expire = expire_on_commit
+        else:
+            assert s._transaction
+            expect_expire = True
+
+        if modify_unconditional:
+            # this autobegins
+            u.name = "y"
+            expect_expire = True
+
+        if not expect_expire:
+            assert not s._transaction
+
+        # test is that state is consistent after rollback()
+        s.rollback()
+
+        if autocommit and not begin and modify_unconditional:
+            eq_(u.name, "y")
+        else:
+            if not expect_expire:
+                assert "name" in u.__dict__
+            else:
+                assert "name" not in u.__dict__
+            eq_(u.name, "x")
+
+    def test_no_autoflush_or_commit_in_expire_w_autocommit(self):
+        """test second part of :ticket:`6233`.
+
+        Here we test that the "autoflush on unexpire" feature added
+        in :ticket:`5226` is turned off for a legacy autocommit session.
+
+        """
+
+        with testing.expect_deprecated_20(autocommit_dep):
+            s = Session(
+                testing.db,
+                autocommit=True,
+                expire_on_commit=True,
+                autoflush=True,
+            )
+
+        User, users = self.classes.User, self.tables.users
+
+        self.mapper_registry.map_imperatively(User, users)
+
+        u1 = User(name="u1")
+        s.add(u1)
+        s.flush()  # this commits
+
+        u1.name = "u2"  # this does not commit
+
+        assert "id" not in u1.__dict__
+        u1.id  # this unexpires
+
+        # never expired
+        eq_(u1.__dict__["name"], "u2")
+
+        eq_(u1.name, "u2")
+
+        # still in dirty collection
+        assert u1 in s.dirty
+
+
 class AutocommitClosesOnFailTest(fixtures.MappedTest):
     __requires__ = ("deferrable_fks",)
 
@@ -2963,7 +3090,8 @@ class AutocommitClosesOnFailTest(fixtures.MappedTest):
     def test_close_transaction_on_commit_fail(self):
         T2 = self.classes.T2
 
-        session = Session(testing.db, autocommit=True)
+        with testing.expect_deprecated_20(autocommit_dep):
+            session = Session(testing.db, autocommit=True)
 
         # with a deferred constraint, this fails at COMMIT time instead
         # of at INSERT time.
@@ -4248,6 +4376,382 @@ class DistinctOrderByImplicitTest(QueryTest, AssertsCompiledSQL):
             )
 
 
+class AutoCommitTest(_LocalFixture):
+    __backend__ = True
+
+    def test_begin_nested_requires_trans(self):
+        with assertions.expect_deprecated_20(autocommit_dep):
+            sess = fixture_session(autocommit=True)
+        assert_raises(sa_exc.InvalidRequestError, sess.begin_nested)
+
+    def test_begin_preflush(self):
+        User = self.classes.User
+        with assertions.expect_deprecated_20(autocommit_dep):
+            sess = fixture_session(autocommit=True)
+
+        u1 = User(name="ed")
+        sess.add(u1)
+
+        sess.begin()
+        u2 = User(name="some other user")
+        sess.add(u2)
+        sess.rollback()
+        assert u2 not in sess
+        assert u1 in sess
+        assert sess.query(User).filter_by(name="ed").one() is u1
+
+    def test_accounting_commit_fails_add(self):
+        User = self.classes.User
+        with assertions.expect_deprecated_20(autocommit_dep):
+            sess = fixture_session(autocommit=True)
+
+        fail = False
+
+        def fail_fn(*arg, **kw):
+            if fail:
+                raise Exception("commit fails")
+
+        event.listen(sess, "after_flush_postexec", fail_fn)
+        u1 = User(name="ed")
+        sess.add(u1)
+
+        fail = True
+        assert_raises(Exception, sess.flush)
+        fail = False
+
+        assert u1 not in sess
+        u1new = User(id=2, name="fred")
+        sess.add(u1new)
+        sess.add(u1)
+        sess.flush()
+        assert u1 in sess
+        eq_(
+            sess.query(User.name).order_by(User.name).all(),
+            [("ed",), ("fred",)],
+        )
+
+    def test_accounting_commit_fails_delete(self):
+        User = self.classes.User
+        with assertions.expect_deprecated_20(autocommit_dep):
+            sess = fixture_session(autocommit=True)
+
+        fail = False
+
+        def fail_fn(*arg, **kw):
+            if fail:
+                raise Exception("commit fails")
+
+        event.listen(sess, "after_flush_postexec", fail_fn)
+        u1 = User(name="ed")
+        sess.add(u1)
+        sess.flush()
+
+        sess.delete(u1)
+        fail = True
+        assert_raises(Exception, sess.flush)
+        fail = False
+
+        assert u1 in sess
+        assert u1 not in sess.deleted
+        sess.delete(u1)
+        sess.flush()
+        assert u1 not in sess
+        eq_(sess.query(User.name).order_by(User.name).all(), [])
+
+    @testing.requires.updateable_autoincrement_pks
+    def test_accounting_no_select_needed(self):
+        """test that flush accounting works on non-expired instances
+        when autocommit=True/expire_on_commit=True."""
+
+        User = self.classes.User
+        with assertions.expect_deprecated_20(autocommit_dep):
+            sess = fixture_session(autocommit=True, expire_on_commit=True)
+
+        u1 = User(id=1, name="ed")
+        sess.add(u1)
+        sess.flush()
+
+        u1.id = 3
+        u1.name = "fred"
+        self.assert_sql_count(testing.db, sess.flush, 1)
+        assert "id" not in u1.__dict__
+        eq_(u1.id, 3)
+
+
+class SessionStateTest(_fixtures.FixtureTest):
+    run_inserts = None
+
+    __prefer_requires__ = ("independent_connections",)
+
+    def test_autocommit_doesnt_raise_on_pending(self):
+        User, users = self.classes.User, self.tables.users
+
+        self.mapper_registry.map_imperatively(User, users)
+        with assertions.expect_deprecated_20(autocommit_dep):
+            session = Session(testing.db, autocommit=True)
+
+        session.add(User(name="ed"))
+
+        session.begin()
+        session.flush()
+        session.commit()
+
+
+class SessionTransactionTest(fixtures.RemovesEvents, _fixtures.FixtureTest):
+    run_inserts = None
+    __backend__ = True
+
+    @testing.fixture
+    def conn(self):
+        with testing.db.connect() as conn:
+            yield conn
+
+    @testing.fixture
+    def future_conn(self):
+
+        engine = Engine._future_facade(testing.db)
+        with engine.connect() as conn:
+            yield conn
+
+    def test_deactive_status_check(self):
+        sess = fixture_session()
+        trans = sess.begin()
+
+        with assertions.expect_deprecated_20(subtransactions_dep):
+            trans2 = sess.begin(subtransactions=True)
+        trans2.rollback()
+        assert_raises_message(
+            sa_exc.InvalidRequestError,
+            "This session is in 'inactive' state, due to the SQL transaction "
+            "being rolled back; no further SQL can be emitted within this "
+            "transaction.",
+            trans.commit,
+        )
+
+    def test_deactive_status_check_w_exception(self):
+        sess = fixture_session()
+        trans = sess.begin()
+        with assertions.expect_deprecated_20(subtransactions_dep):
+            trans2 = sess.begin(subtransactions=True)
+        try:
+            raise Exception("test")
+        except Exception:
+            trans2.rollback(_capture_exception=True)
+        assert_raises_message(
+            sa_exc.PendingRollbackError,
+            r"This Session's transaction has been rolled back due to a "
+            r"previous exception during flush. To begin a new transaction "
+            r"with this Session, first issue Session.rollback\(\). "
+            r"Original exception was: test",
+            trans.commit,
+        )
+
+    def test_error_on_using_inactive_session_commands(self):
+        users, User = self.tables.users, self.classes.User
+
+        self.mapper_registry.map_imperatively(User, users)
+        with assertions.expect_deprecated_20(autocommit_dep):
+            sess = fixture_session(autocommit=True)
+        sess.begin()
+        with assertions.expect_deprecated_20(subtransactions_dep):
+            sess.begin(subtransactions=True)
+        sess.add(User(name="u1"))
+        sess.flush()
+        sess.rollback()
+        with assertions.expect_deprecated_20(subtransactions_dep):
+            assert_raises_message(
+                sa_exc.InvalidRequestError,
+                "This session is in 'inactive' state, due to the SQL "
+                "transaction "
+                "being rolled back; no further SQL can be emitted within this "
+                "transaction.",
+                sess.begin,
+                subtransactions=True,
+            )
+        sess.close()
+
+    def test_subtransaction_on_external_subtrans(self, conn):
+        users, User = self.tables.users, self.classes.User
+
+        self.mapper_registry.map_imperatively(User, users)
+
+        trans = conn.begin()
+        sess = Session(bind=conn, autocommit=False, autoflush=True)
+        with assertions.expect_deprecated_20(subtransactions_dep):
+            sess.begin(subtransactions=True)
+        u = User(name="ed")
+        sess.add(u)
+        sess.flush()
+        sess.commit()  # commit does nothing
+        trans.rollback()  # rolls back
+        assert len(sess.query(User).all()) == 0
+        sess.close()
+
+    def test_subtransaction_on_noautocommit(self):
+        User, users = self.classes.User, self.tables.users
+
+        self.mapper_registry.map_imperatively(User, users)
+        sess = fixture_session(autocommit=False, autoflush=True)
+        with assertions.expect_deprecated_20(subtransactions_dep):
+            sess.begin(subtransactions=True)
+        u = User(name="u1")
+        sess.add(u)
+        sess.flush()
+        sess.commit()  # commit does nothing
+        sess.rollback()  # rolls back
+        assert len(sess.query(User).all()) == 0
+        sess.close()
+
+    @testing.requires.savepoints
+    def test_heavy_nesting(self):
+        users = self.tables.users
+
+        session = fixture_session()
+        session.begin()
+        session.connection().execute(users.insert().values(name="user1"))
+        with assertions.expect_deprecated_20(subtransactions_dep):
+            session.begin(subtransactions=True)
+        session.begin_nested()
+        session.connection().execute(users.insert().values(name="user2"))
+        assert (
+            session.connection()
+            .exec_driver_sql("select count(1) from users")
+            .scalar()
+            == 2
+        )
+        session.rollback()
+        assert (
+            session.connection()
+            .exec_driver_sql("select count(1) from users")
+            .scalar()
+            == 1
+        )
+        session.connection().execute(users.insert().values(name="user3"))
+        session.commit()
+        assert (
+            session.connection()
+            .exec_driver_sql("select count(1) from users")
+            .scalar()
+            == 2
+        )
+
+    @testing.requires.savepoints
+    def test_heavy_nesting_future(self):
+        users = self.tables.users
+
+        from sqlalchemy.future import Engine
+
+        engine = Engine._future_facade(testing.db)
+        with Session(engine, autocommit=False) as session:
+            session.begin()
+            session.connection().execute(users.insert().values(name="user1"))
+            with assertions.expect_deprecated_20(subtransactions_dep):
+                session.begin(subtransactions=True)
+            session.begin_nested()
+            session.connection().execute(users.insert().values(name="user2"))
+            assert (
+                session.connection()
+                .exec_driver_sql("select count(1) from users")
+                .scalar()
+                == 2
+            )
+            session.rollback()
+            assert (
+                session.connection()
+                .exec_driver_sql("select count(1) from users")
+                .scalar()
+                == 1
+            )
+            session.connection().execute(users.insert().values(name="user3"))
+            session.commit()
+            assert (
+                session.connection()
+                .exec_driver_sql("select count(1) from users")
+                .scalar()
+                == 2
+            )
+
+    @testing.requires.savepoints
+    def test_mixed_transaction_control(self):
+        users, User = self.tables.users, self.classes.User
+
+        self.mapper_registry.map_imperatively(User, users)
+
+        with assertions.expect_deprecated_20(autocommit_dep):
+            sess = fixture_session(autocommit=True)
+
+        sess.begin()
+        sess.begin_nested()
+        with assertions.expect_deprecated_20(subtransactions_dep):
+            transaction = sess.begin(subtransactions=True)
+
+        sess.add(User(name="u1"))
+
+        transaction.commit()
+        sess.commit()
+        sess.commit()
+
+        sess.close()
+
+        eq_(len(sess.query(User).all()), 1)
+
+        t1 = sess.begin()
+        t2 = sess.begin_nested()
+
+        sess.add(User(name="u2"))
+
+        t2.commit()
+        assert sess._legacy_transaction() is t1
+
+        sess.close()
+
+    @testing.requires.savepoints
+    def test_nested_transaction_connection_add_autocommit(self):
+        users, User = self.tables.users, self.classes.User
+
+        self.mapper_registry.map_imperatively(User, users)
+
+        with assertions.expect_deprecated_20(autocommit_dep):
+            sess = fixture_session(autocommit=True)
+
+        sess.begin()
+        sess.begin_nested()
+
+        u1 = User(name="u1")
+        sess.add(u1)
+        sess.flush()
+
+        sess.rollback()
+
+        u2 = User(name="u2")
+        sess.add(u2)
+
+        sess.commit()
+
+        eq_(set(sess.query(User).all()), set([u2]))
+
+        sess.begin()
+        sess.begin_nested()
+
+        u3 = User(name="u3")
+        sess.add(u3)
+        sess.commit()  # commit the nested transaction
+        sess.rollback()
+
+        eq_(set(sess.query(User).all()), set([u2]))
+
+        sess.close()
+
+    def test_active_flag_autocommit(self):
+        with assertions.expect_deprecated_20(autocommit_dep):
+            sess = Session(bind=testing.db, autocommit=True)
+        assert not sess.is_active
+        sess.begin()
+        assert sess.is_active
+        sess.rollback()
+        assert not sess.is_active
+
+
 class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest):
     run_inserts = None
 
@@ -7407,6 +7911,89 @@ class CacheKeyTest(CacheKeyFixture, _fixtures.FixtureTest):
         )
 
 
+class PolyCacheKeyTest(CacheKeyFixture, _poly_fixtures._Polymorphic):
+    run_setup_mappers = "once"
+    run_inserts = None
+    run_deletes = None
+
+    def _stmt_20(self, *elements):
+        return tuple(
+            elem._statement_20() if isinstance(elem, sa.orm.Query) else elem
+            for elem in elements
+        )
+
+    def test_wp_queries(self):
+        Person, Manager, Engineer, Boss = self.classes(
+            "Person", "Manager", "Engineer", "Boss"
+        )
+
+        def one():
+            with assertions.expect_deprecated_20(w_polymorphic_dep):
+                return (
+                    fixture_session()
+                    .query(Person)
+                    .with_polymorphic([Manager, Engineer])
+                )
+
+        def two():
+            wp = with_polymorphic(Person, [Manager, Engineer])
+
+            return fixture_session().query(wp)
+
+        def three():
+            wp = with_polymorphic(Person, [Manager, Engineer])
+
+            return fixture_session().query(wp).filter(wp.name == "asdfo")
+
+        def three_a():
+            wp = with_polymorphic(Person, [Manager, Engineer], flat=True)
+
+            return fixture_session().query(wp).filter(wp.name == "asdfo")
+
+        def four():
+            with assertions.expect_deprecated_20(w_polymorphic_dep):
+                return (
+                    fixture_session()
+                    .query(Person)
+                    .with_polymorphic([Manager, Engineer])
+                    .filter(Person.name == "asdf")
+                )
+
+        def five():
+            subq = (
+                select(Person)
+                .outerjoin(Manager)
+                .outerjoin(Engineer)
+                .subquery()
+            )
+            wp = with_polymorphic(Person, [Manager, Engineer], subq)
+
+            return fixture_session().query(wp).filter(wp.name == "asdfo")
+
+        def six():
+            subq = (
+                select(Person)
+                .outerjoin(Manager)
+                .outerjoin(Engineer)
+                .subquery()
+            )
+
+            with assertions.expect_deprecated_20(w_polymorphic_dep):
+                return (
+                    fixture_session()
+                    .query(Person)
+                    .with_polymorphic([Manager, Engineer], subq)
+                    .filter(Person.name == "asdfo")
+                )
+
+        self._run_cache_key_fixture(
+            lambda: self._stmt_20(
+                one(), two(), three(), three_a(), four(), five(), six()
+            ),
+            compare_values=True,
+        )
+
+
 class AliasedClassRelationshipTest(
     PartitionByFixture, testing.AssertsCompiledSQL
 ):
index 3b09d8eb18d63db0dedaef7651736ae316f9df62..5be0042b0dad3981ad36ad49a51de9f85f590542 100644 (file)
@@ -2757,12 +2757,13 @@ class InheritanceToRelatedTest(fixtures.MappedTest):
         )
         s = Session(testing.db)
 
+        fp = with_polymorphic(Foo, [Bar, Baz])
+
         def go():
             eq_(
-                s.query(Foo)
-                .with_polymorphic([Bar, Baz])
-                .order_by(Foo.id)
-                .options(subqueryload(Foo.related))
+                s.query(fp)
+                .order_by(fp.id)
+                .options(subqueryload(fp.related))
                 .all(),
                 [
                     Bar(id=1, related=Related(id=1)),
@@ -2784,12 +2785,13 @@ class InheritanceToRelatedTest(fixtures.MappedTest):
         )
         s = Session(testing.db)
 
+        fp = with_polymorphic(Foo, [Bar, Baz])
+
         def go():
             eq_(
-                s.query(Foo)
-                .with_polymorphic([Bar, Baz])
-                .order_by(Foo.id)
-                .options(joinedload(Foo.related))
+                s.query(fp)
+                .order_by(fp.id)
+                .options(joinedload(fp.related))
                 .all(),
                 [
                     Bar(id=1, related=Related(id=1)),