result in their table being appended directly to the FROM clause
which will usually lead to incorrect results.
+ When left at its default value of ``False``, the polymorphic
+ selectable assigned to the base mapper is used for selecting rows.
+ However, it may also be passed as ``None``, which will bypass the
+ configured polymorphic selectable and instead construct an ad-hoc
+ selectable for the target classes given; for joined table inheritance
+ this will be a join that includes all target mappers and their
+ subclasses.
+
:param polymorphic_on: a column to be used as the "discriminator"
column for the given selectable. If not given, the polymorphic_on
attribute of the base classes' mapper will be used, if any. This
# be "error" however for I98b8defdf7c37b818b3824d02f7668e3f5f31c94
# we are moving one at a time
for msg in [
- #
- # ORM Query
- #
- r"The Query.with_polymorphic\(\) method is considered "
- "legacy as of the 1.x series",
#
# ORM Session
#
from sqlalchemy.orm import declared_attr
from sqlalchemy.orm import deferred
from sqlalchemy.orm import relationship
+from sqlalchemy.orm import with_polymorphic
from sqlalchemy.orm.decl_api import registry
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
sess.add(c2)
sess.flush()
sess.expunge_all()
+
+ wp = with_polymorphic(Person, [Engineer])
eq_(
- sess.query(Person)
- .with_polymorphic(Engineer)
- .filter(Engineer.primary_language == "cobol")
+ sess.query(wp)
+ .filter(wp.Engineer.primary_language == "cobol")
.first(),
Engineer(name="vlad"),
)
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
+from sqlalchemy import literal
+from sqlalchemy import null
+from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import testing
+from sqlalchemy import union_all
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import attributes
from sqlalchemy.orm import class_mapper
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import polymorphic_union
from sqlalchemy.orm import relationship
+from sqlalchemy.orm.util import with_polymorphic
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing.schema import Table
-class Employee(object):
- def __init__(self, name):
- self.name = name
-
- def __repr__(self):
- return self.__class__.__name__ + " " + self.name
-
-
-class Manager(Employee):
- def __init__(self, name, manager_data):
- self.name = name
- self.manager_data = manager_data
-
- def __repr__(self):
- return (
- self.__class__.__name__ + " " + self.name + " " + self.manager_data
- )
-
-
-class Engineer(Employee):
- def __init__(self, name, engineer_info):
- self.name = name
- self.engineer_info = engineer_info
-
- def __repr__(self):
- return (
- self.__class__.__name__
- + " "
- + self.name
- + " "
- + self.engineer_info
- )
-
-
-class Hacker(Engineer):
- def __init__(self, name, nickname, engineer_info):
- self.name = name
- self.nickname = nickname
- self.engineer_info = engineer_info
-
- def __repr__(self):
- return (
- self.__class__.__name__
- + " "
- + self.name
- + " '"
- + self.nickname
- + "' "
- + self.engineer_info
- )
-
-
-class Company(object):
- pass
-
-
class ConcreteTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
- global managers_table, engineers_table, hackers_table
- global companies, employees_table
- companies = Table(
+ Table(
"companies",
metadata,
Column(
),
Column("name", String(50)),
)
- employees_table = Table(
+ Table(
"employees",
metadata,
Column(
Column("name", String(50)),
Column("company_id", Integer, ForeignKey("companies.id")),
)
- managers_table = Table(
+ Table(
"managers",
metadata,
Column(
Column("manager_data", String(50)),
Column("company_id", Integer, ForeignKey("companies.id")),
)
- engineers_table = Table(
+ Table(
"engineers",
metadata,
Column(
Column("engineer_info", String(50)),
Column("company_id", Integer, ForeignKey("companies.id")),
)
- hackers_table = Table(
+ Table(
"hackers",
metadata,
Column(
Column("nickname", String(50)),
)
+ @classmethod
+ def setup_classes(cls):
+ class Employee(cls.Basic):
+ def __init__(self, name):
+ self.name = name
+
+ def __repr__(self):
+ return self.__class__.__name__ + " " + self.name
+
+ class Manager(Employee):
+ def __init__(self, name, manager_data):
+ self.name = name
+ self.manager_data = manager_data
+
+ def __repr__(self):
+ return (
+ self.__class__.__name__
+ + " "
+ + self.name
+ + " "
+ + self.manager_data
+ )
+
+ class Engineer(Employee):
+ def __init__(self, name, engineer_info):
+ self.name = name
+ self.engineer_info = engineer_info
+
+ def __repr__(self):
+ return (
+ self.__class__.__name__
+ + " "
+ + self.name
+ + " "
+ + self.engineer_info
+ )
+
+ class Hacker(Engineer):
+ def __init__(self, name, nickname, engineer_info):
+ self.name = name
+ self.nickname = nickname
+ self.engineer_info = engineer_info
+
+ def __repr__(self):
+ return (
+ self.__class__.__name__
+ + " "
+ + self.name
+ + " '"
+ + self.nickname
+ + "' "
+ + self.engineer_info
+ )
+
+ class Company(cls.Basic):
+ pass
+
def test_basic(self):
+ Employee, Engineer, Manager = self.classes(
+ "Employee", "Engineer", "Manager"
+ )
+ engineers_table, managers_table = self.tables("engineers", "managers")
+
pjoin = polymorphic_union(
{"manager": managers_table, "engineer": engineers_table},
"type",
polymorphic_identity="engineer",
)
session = fixture_session()
- session.add(Manager("Tom", "knows how to manage things"))
- session.add(Engineer("Kurt", "knows how to hack"))
+ session.add(Manager("Sally", "knows how to manage things"))
+ session.add(Engineer("Karina", "knows how to hack"))
session.flush()
session.expunge_all()
assert set([repr(x) for x in session.query(Employee)]) == set(
[
- "Engineer Kurt knows how to hack",
- "Manager Tom knows how to manage things",
+ "Engineer Karina knows how to hack",
+ "Manager Sally knows how to manage things",
]
)
assert set([repr(x) for x in session.query(Manager)]) == set(
- ["Manager Tom knows how to manage things"]
+ ["Manager Sally knows how to manage things"]
)
assert set([repr(x) for x in session.query(Engineer)]) == set(
- ["Engineer Kurt knows how to hack"]
+ ["Engineer Karina knows how to hack"]
)
manager = session.query(Manager).one()
session.expire(manager, ["manager_data"])
eq_(manager.manager_data, "knows how to manage things")
def test_multi_level_no_base(self):
+ Employee, Engineer, Manager = self.classes(
+ "Employee", "Engineer", "Manager"
+ )
+ (Hacker,) = self.classes("Hacker")
+ engineers_table, managers_table = self.tables("engineers", "managers")
+ (hackers_table,) = self.tables("hackers")
+
pjoin = polymorphic_union(
{
"manager": managers_table,
polymorphic_identity="hacker",
)
session = fixture_session()
- tom = Manager("Tom", "knows how to manage things")
+ sally = Manager("Sally", "knows how to manage things")
assert_raises_message(
AttributeError,
"does not implement attribute .?'type' at the instance level.",
setattr,
- tom,
+ sally,
"type",
"sometype",
)
- jerry = Engineer("Jerry", "knows how to program")
- hacker = Hacker("Kurt", "Badass", "knows how to hack")
+ jenn = Engineer("Jenn", "knows how to program")
+ hacker = Hacker("Karina", "Badass", "knows how to hack")
assert_raises_message(
AttributeError,
"sometype",
)
- session.add_all((tom, jerry, hacker))
+ session.add_all((sally, jenn, hacker))
session.flush()
# ensure "readonly" on save logic didn't pollute the
assert (
"nickname"
- not in attributes.instance_state(jerry).expired_attributes
- )
- assert (
- "name" not in attributes.instance_state(jerry).expired_attributes
+ not in attributes.instance_state(jenn).expired_attributes
)
+ assert "name" not in attributes.instance_state(jenn).expired_attributes
assert (
"name" not in attributes.instance_state(hacker).expired_attributes
)
)
def go():
- eq_(jerry.name, "Jerry")
+ eq_(jenn.name, "Jenn")
eq_(hacker.nickname, "Badass")
self.assert_sql_count(testing.db, go, 0)
session.expunge_all()
assert (
- repr(session.query(Employee).filter(Employee.name == "Tom").one())
- == "Manager Tom knows how to manage things"
+ repr(
+ session.query(Employee).filter(Employee.name == "Sally").one()
+ )
+ == "Manager Sally knows how to manage things"
)
assert (
- repr(session.query(Manager).filter(Manager.name == "Tom").one())
- == "Manager Tom knows how to manage things"
+ repr(session.query(Manager).filter(Manager.name == "Sally").one())
+ == "Manager Sally knows how to manage things"
)
assert set([repr(x) for x in session.query(Employee).all()]) == set(
[
- "Engineer Jerry knows how to program",
- "Manager Tom knows how to manage things",
- "Hacker Kurt 'Badass' knows how to hack",
+ "Engineer Jenn knows how to program",
+ "Manager Sally knows how to manage things",
+ "Hacker Karina 'Badass' knows how to hack",
]
)
assert set([repr(x) for x in session.query(Manager).all()]) == set(
- ["Manager Tom knows how to manage things"]
+ ["Manager Sally knows how to manage things"]
)
assert set([repr(x) for x in session.query(Engineer).all()]) == set(
[
- "Engineer Jerry knows how to program",
- "Hacker Kurt 'Badass' knows how to hack",
+ "Engineer Jenn knows how to program",
+ "Hacker Karina 'Badass' knows how to hack",
]
)
assert set([repr(x) for x in session.query(Hacker).all()]) == set(
- ["Hacker Kurt 'Badass' knows how to hack"]
+ ["Hacker Karina 'Badass' knows how to hack"]
)
def test_multi_level_no_base_w_hybrid(self):
+ Employee, Engineer, Manager = self.classes(
+ "Employee", "Engineer", "Manager"
+ )
+ (Hacker,) = self.classes("Hacker")
+ engineers_table, managers_table = self.tables("engineers", "managers")
+ (hackers_table,) = self.tables("hackers")
+
pjoin = polymorphic_union(
{
"manager": managers_table,
)
session = fixture_session()
- tom = ManagerWHybrid("Tom", "mgrdata")
+ sally = ManagerWHybrid("Sally", "mgrdata")
# mapping did not impact the engineer_info
# hybrid in any way
eq_(test_calls.mock_calls, [])
- eq_(tom.engineer_info, "mgrdata")
+ eq_(sally.engineer_info, "mgrdata")
eq_(test_calls.mock_calls, [mock.call.engineer_info_instance()])
- session.add(tom)
+ session.add(sally)
session.commit()
session.close()
- tom = (
+ Sally = (
session.query(ManagerWHybrid)
.filter(ManagerWHybrid.engineer_info == "mgrdata")
.one()
mock.call.engineer_info_class(),
],
)
- eq_(tom.engineer_info, "mgrdata")
+ eq_(Sally.engineer_info, "mgrdata")
def test_multi_level_with_base(self):
+ Employee, Engineer, Manager = self.classes(
+ "Employee", "Engineer", "Manager"
+ )
+ employees_table, engineers_table, managers_table = self.tables(
+ "employees", "engineers", "managers"
+ )
+ (hackers_table,) = self.tables("hackers")
+ (Hacker,) = self.classes("Hacker")
+
pjoin = polymorphic_union(
{
"employee": employees_table,
polymorphic_identity="hacker",
)
session = fixture_session()
- tom = Manager("Tom", "knows how to manage things")
- jerry = Engineer("Jerry", "knows how to program")
- hacker = Hacker("Kurt", "Badass", "knows how to hack")
- session.add_all((tom, jerry, hacker))
+ sally = Manager("Sally", "knows how to manage things")
+ jenn = Engineer("Jenn", "knows how to program")
+ hacker = Hacker("Karina", "Badass", "knows how to hack")
+ session.add_all((sally, jenn, hacker))
session.flush()
def go():
- eq_(jerry.name, "Jerry")
+ eq_(jenn.name, "Jenn")
eq_(hacker.nickname, "Badass")
self.assert_sql_count(testing.db, go, 0)
)
assert set([repr(x) for x in session.query(Employee)]) == set(
[
- "Engineer Jerry knows how to program",
- "Manager Tom knows how to manage things",
- "Hacker Kurt 'Badass' knows how to hack",
+ "Engineer Jenn knows how to program",
+ "Manager Sally knows how to manage things",
+ "Hacker Karina 'Badass' knows how to hack",
]
)
assert set([repr(x) for x in session.query(Manager)]) == set(
- ["Manager Tom knows how to manage things"]
+ ["Manager Sally knows how to manage things"]
)
assert set([repr(x) for x in session.query(Engineer)]) == set(
[
- "Engineer Jerry knows how to program",
- "Hacker Kurt 'Badass' knows how to hack",
+ "Engineer Jenn knows how to program",
+ "Hacker Karina 'Badass' knows how to hack",
]
)
assert set([repr(x) for x in session.query(Hacker)]) == set(
- ["Hacker Kurt 'Badass' knows how to hack"]
+ ["Hacker Karina 'Badass' knows how to hack"]
)
- def test_without_default_polymorphic(self):
+ @testing.fixture
+ def two_pjoin_fixture(self):
+ Employee, Engineer, Manager = self.classes(
+ "Employee", "Engineer", "Manager"
+ )
+ (Hacker,) = self.classes("Hacker")
+ (employees_table,) = self.tables("employees")
+ engineers_table, managers_table = self.tables("engineers", "managers")
+ (hackers_table,) = self.tables("hackers")
+
pjoin = polymorphic_union(
{
"employee": employees_table,
concrete=True,
polymorphic_identity="hacker",
)
- session = fixture_session()
+
+ session = fixture_session(expire_on_commit=False)
jdoe = Employee("Jdoe")
- tom = Manager("Tom", "knows how to manage things")
- jerry = Engineer("Jerry", "knows how to program")
- hacker = Hacker("Kurt", "Badass", "knows how to hack")
- session.add_all((jdoe, tom, jerry, hacker))
- session.flush()
+ sally = Manager("Sally", "knows how to manage things")
+ jenn = Engineer("Jenn", "knows how to program")
+ hacker = Hacker("Karina", "Badass", "knows how to hack")
+ session.add_all((jdoe, sally, jenn, hacker))
+ session.commit()
+
+ return (
+ session,
+ Employee,
+ Engineer,
+ Manager,
+ Hacker,
+ pjoin,
+ pjoin2,
+ jdoe,
+ sally,
+ jenn,
+ hacker,
+ )
+
+ def test_without_default_polymorphic_one(self, two_pjoin_fixture):
+ (
+ session,
+ Employee,
+ Engineer,
+ Manager,
+ Hacker,
+ pjoin,
+ pjoin2,
+ jdoe,
+ sally,
+ jenn,
+ hacker,
+ ) = two_pjoin_fixture
+
+ wp = with_polymorphic(
+ Employee, "*", pjoin, polymorphic_on=pjoin.c.type
+ )
+
eq_(
- len(
- session.connection()
- .execute(
- session.query(Employee)
- .with_polymorphic("*", pjoin, pjoin.c.type)
- .statement
- )
- .fetchall()
- ),
- 4,
+ sorted([repr(x) for x in session.query(wp)]),
+ [
+ "Employee Jdoe",
+ "Engineer Jenn knows how to program",
+ "Hacker Karina 'Badass' knows how to hack",
+ "Manager Sally knows how to manage things",
+ ],
)
eq_(session.get(Employee, jdoe.employee_id), jdoe)
- eq_(session.get(Engineer, jerry.employee_id), jerry)
+ eq_(session.get(Engineer, jenn.employee_id), jenn)
+
+ def test_without_default_polymorphic_two(self, two_pjoin_fixture):
+ (
+ session,
+ Employee,
+ Engineer,
+ Manager,
+ Hacker,
+ pjoin,
+ pjoin2,
+ jdoe,
+ sally,
+ jenn,
+ hacker,
+ ) = two_pjoin_fixture
+ wp = with_polymorphic(
+ Employee, "*", pjoin, polymorphic_on=pjoin.c.type
+ )
+
eq_(
- set(
- [
- repr(x)
- for x in session.query(Employee).with_polymorphic(
- "*", pjoin, pjoin.c.type
- )
- ]
- ),
- set(
- [
- "Employee Jdoe",
- "Engineer Jerry knows how to program",
- "Manager Tom knows how to manage things",
- "Hacker Kurt 'Badass' knows how to hack",
- ]
- ),
+ sorted([repr(x) for x in session.query(wp)]),
+ [
+ "Employee Jdoe",
+ "Engineer Jenn knows how to program",
+ "Hacker Karina 'Badass' knows how to hack",
+ "Manager Sally knows how to manage things",
+ ],
)
+
+ def test_without_default_polymorphic_three(self, two_pjoin_fixture):
+ (
+ session,
+ Employee,
+ Engineer,
+ Manager,
+ Hacker,
+ pjoin,
+ pjoin2,
+ jdoe,
+ sally,
+ jenn,
+ hacker,
+ ) = two_pjoin_fixture
eq_(
- set([repr(x) for x in session.query(Manager)]),
- set(["Manager Tom knows how to manage things"]),
+ sorted([repr(x) for x in session.query(Manager)]),
+ ["Manager Sally knows how to manage things"],
+ )
+
+ def test_without_default_polymorphic_four(self, two_pjoin_fixture):
+ (
+ session,
+ Employee,
+ Engineer,
+ Manager,
+ Hacker,
+ pjoin,
+ pjoin2,
+ jdoe,
+ sally,
+ jenn,
+ hacker,
+ ) = two_pjoin_fixture
+ wp2 = with_polymorphic(
+ Engineer, "*", pjoin2, polymorphic_on=pjoin2.c.type
)
eq_(
- set(
- [
- repr(x)
- for x in session.query(Engineer).with_polymorphic(
- "*", pjoin2, pjoin2.c.type
- )
- ]
- ),
- set(
- [
- "Engineer Jerry knows how to program",
- "Hacker Kurt 'Badass' knows how to hack",
- ]
- ),
+ sorted([repr(x) for x in session.query(wp2)]),
+ [
+ "Engineer Jenn knows how to program",
+ "Hacker Karina 'Badass' knows how to hack",
+ ],
)
+
+ def test_without_default_polymorphic_five(self, two_pjoin_fixture):
+ (
+ session,
+ Employee,
+ Engineer,
+ Manager,
+ Hacker,
+ pjoin,
+ pjoin2,
+ jdoe,
+ sally,
+ jenn,
+ hacker,
+ ) = two_pjoin_fixture
eq_(
- set([repr(x) for x in session.query(Hacker)]),
- set(["Hacker Kurt 'Badass' knows how to hack"]),
+ [repr(x) for x in session.query(Hacker)],
+ ["Hacker Karina 'Badass' knows how to hack"],
)
- # test adaption of the column by wrapping the query in a
- # subquery
+ def test_without_default_polymorphic_six(self, two_pjoin_fixture):
+ (
+ session,
+ Employee,
+ Engineer,
+ Manager,
+ Hacker,
+ pjoin,
+ pjoin2,
+ jdoe,
+ sally,
+ jenn,
+ hacker,
+ ) = two_pjoin_fixture
- with testing.expect_deprecated(r"The Query.from_self\(\) method"):
- eq_(
- len(
- session.connection()
- .execute(
- session.query(Engineer)
- .with_polymorphic("*", pjoin2, pjoin2.c.type)
- .from_self()
- .statement
- )
- .fetchall()
- ),
- 2,
+ # this test is adapting what used to use from_self().
+ # it's a weird test but how we would do this would be we would only
+ # apply with_polymorprhic once, after we've created whatever
+ # subquery we want.
+
+ subq = pjoin2.select().subquery()
+
+ wp2 = with_polymorphic(Engineer, "*", subq, polymorphic_on=subq.c.type)
+
+ eq_(
+ sorted([repr(x) for x in session.query(wp2)]),
+ [
+ "Engineer Jenn knows how to program",
+ "Hacker Karina 'Badass' knows how to hack",
+ ],
+ )
+
+ @testing.combinations(True, False, argnames="use_star")
+ def test_without_default_polymorphic_buildit_newstyle(
+ self, two_pjoin_fixture, use_star
+ ):
+ """how would we do these concrete polymorphic queries using 2.0 style,
+ and not any old and esoteric features like "polymorphic_union" ?
+
+ """
+ (
+ session,
+ Employee,
+ Engineer,
+ Manager,
+ Hacker,
+ pjoin,
+ pjoin2,
+ jdoe,
+ sally,
+ jenn,
+ hacker,
+ ) = two_pjoin_fixture
+
+ # make a union using the entities as given and wpoly from it.
+ # a UNION is a UNION. there is no way around having to write
+ # out filler columns. concrete inh is really not a good choice
+ # when you need to select heterogeneously
+ stmt = union_all(
+ select(
+ literal("engineer").label("type"),
+ Engineer,
+ null().label("nickname"),
+ ),
+ select(literal("hacker").label("type"), Hacker),
+ ).subquery()
+
+ # issue: if we make this with_polymorphic(Engineer, [Hacker], ...),
+ # it blows up and tries to add the "engineer" table for unknown reasons
+
+ if use_star:
+ wp = with_polymorphic(
+ Engineer, "*", stmt, polymorphic_on=stmt.c.type
)
- with testing.expect_deprecated(r"The Query.from_self\(\) method"):
- eq_(
- set(
- [
- repr(x)
- for x in session.query(Engineer)
- .with_polymorphic("*", pjoin2, pjoin2.c.type)
- .from_self()
- ]
- ),
- set(
- [
- "Engineer Jerry knows how to program",
- "Hacker Kurt 'Badass' knows how to hack",
- ]
- ),
+ else:
+ wp = with_polymorphic(
+ Engineer, [Engineer, Hacker], stmt, polymorphic_on=stmt.c.type
)
+ result = session.execute(select(wp)).scalars()
+
+ eq_(
+ sorted(repr(obj) for obj in result),
+ [
+ "Engineer Jenn knows how to program",
+ "Hacker Karina 'Badass' knows how to hack",
+ ],
+ )
+
def test_relationship(self):
+ Employee, Engineer, Manager = self.classes(
+ "Employee", "Engineer", "Manager"
+ )
+ (Company,) = self.classes("Company")
+ (companies,) = self.tables("companies")
+ engineers_table, managers_table = self.tables("engineers", "managers")
+
pjoin = polymorphic_union(
{"manager": managers_table, "engineer": engineers_table},
"type",
)
session = fixture_session()
c = Company()
- c.employees.append(Manager("Tom", "knows how to manage things"))
- c.employees.append(Engineer("Kurt", "knows how to hack"))
+ c.employees.append(Manager("Sally", "knows how to manage things"))
+ c.employees.append(Engineer("Karina", "knows how to hack"))
session.add(c)
session.flush()
session.expunge_all()
c2 = session.get(Company, c.id)
assert set([repr(x) for x in c2.employees]) == set(
[
- "Engineer Kurt knows how to hack",
- "Manager Tom knows how to manage things",
+ "Engineer Karina knows how to hack",
+ "Manager Sally knows how to manage things",
]
)
)
assert set([repr(x) for x in c2.employees]) == set(
[
- "Engineer Kurt knows how to hack",
- "Manager Tom knows how to manage things",
+ "Engineer Karina knows how to hack",
+ "Manager Sally knows how to manage things",
]
)
eq_(merged_c1.some_dest_id, c1.some_dest_id)
-class ManyToManyTest(fixtures.MappedTest):
+class ManySallyanyTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table(
),
)
Table(
- "base_mtom",
+ "base_mSally",
metadata,
Column(
"base_id", Integer, ForeignKey("base.id"), primary_key=True
),
)
Table(
- "sub_mtom",
+ "sub_mSally",
metadata,
Column("base_id", Integer, ForeignKey("sub.id"), primary_key=True),
Column(
pass
def test_selective_relationships(self):
- sub, base_mtom, Related, Base, related, sub_mtom, base, Sub = (
+ sub, base_mSally, Related, Base, related, sub_mSally, base, Sub = (
self.tables.sub,
- self.tables.base_mtom,
+ self.tables.base_mSally,
self.classes.Related,
self.classes.Base,
self.tables.related,
- self.tables.sub_mtom,
+ self.tables.sub_mSally,
self.tables.base,
self.classes.Sub,
)
properties={
"related": relationship(
Related,
- secondary=base_mtom,
+ secondary=base_mSally,
backref="bases",
order_by=related.c.id,
)
properties={
"related": relationship(
Related,
- secondary=sub_mtom,
+ secondary=sub_mSally,
backref="subs",
order_by=related.c.id,
)
+from sqlalchemy import exc as sa_exc
from sqlalchemy import ForeignKey
+from sqlalchemy import func
from sqlalchemy import Integer
+from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
+from sqlalchemy import null
+from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import testing
+from sqlalchemy.orm import aliased
+from sqlalchemy.orm import joinedload
+from sqlalchemy.orm import polymorphic_union
from sqlalchemy.orm import relationship
+from sqlalchemy.orm import selectinload
+from sqlalchemy.orm import Session
+from sqlalchemy.testing import assert_raises
+from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.assertions import expect_deprecated_20
from ._poly_fixtures import _PolymorphicJoins
from ._poly_fixtures import _PolymorphicPolymorphic
from ._poly_fixtures import _PolymorphicUnions
+from ._poly_fixtures import Boss
from ._poly_fixtures import Company
from ._poly_fixtures import Engineer
+from ._poly_fixtures import Machine
from ._poly_fixtures import Manager
from ._poly_fixtures import Paperwork
from ._poly_fixtures import Person
r"to Query.join\(\) are deprecated"
)
+with_polymorphic_dep = (
+ r"The Query.with_polymorphic\(\) method is considered legacy as of "
+ r"the 1.x series of SQLAlchemy and will be removed in 2.0"
+)
+
class _PolymorphicTestBase(fixtures.NoCache):
__backend__ = True
def test_join_from_with_polymorphic_flag_aliased_one(self):
sess = fixture_session()
- with expect_deprecated_20(aliased_jp_dep):
+ with expect_deprecated_20(aliased_jp_dep, with_polymorphic_dep):
eq_(
sess.query(Person)
.with_polymorphic(Manager)
def test_join_from_with_polymorphic_flag_aliased_two(self):
sess = fixture_session()
- with expect_deprecated_20(aliased_jp_dep):
+ with expect_deprecated_20(aliased_jp_dep, with_polymorphic_dep):
eq_(
sess.query(Person)
.with_polymorphic([Manager, Engineer])
[e1],
)
+ def test_with_polymorphic_one(self):
+ sess = fixture_session()
+
+ def go():
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person)
+ .with_polymorphic(Engineer)
+ .filter(Engineer.primary_language == "java")
+ .all(),
+ self._emps_wo_relationships_fixture()[0:1],
+ )
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_with_polymorphic_two(self):
+ sess = fixture_session()
+
+ def go():
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person)
+ .with_polymorphic("*")
+ .order_by(Person.person_id)
+ .all(),
+ self._emps_wo_relationships_fixture(),
+ )
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_with_polymorphic_three(self):
+ sess = fixture_session()
+
+ def go():
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person)
+ .with_polymorphic(Engineer)
+ .order_by(Person.person_id)
+ .all(),
+ self._emps_wo_relationships_fixture(),
+ )
+
+ self.assert_sql_count(testing.db, go, 3)
+
+ def test_with_polymorphic_four(self):
+ sess = fixture_session()
+
+ def go():
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person)
+ .with_polymorphic(Engineer, people.outerjoin(engineers))
+ .order_by(Person.person_id)
+ .all(),
+ self._emps_wo_relationships_fixture(),
+ )
+
+ self.assert_sql_count(testing.db, go, 3)
+
+ def test_with_polymorphic_five(self):
+ sess = fixture_session()
+
+ def go():
+ # limit the polymorphic join down to just "Person",
+ # overriding select_table
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person).with_polymorphic(Person).all(),
+ self._emps_wo_relationships_fixture(),
+ )
+
+ self.assert_sql_count(testing.db, go, 6)
+
+ def test_with_polymorphic_six(self):
+ sess = fixture_session()
+
+ with expect_deprecated_20(with_polymorphic_dep):
+ assert_raises(
+ sa_exc.InvalidRequestError,
+ sess.query(Person).with_polymorphic,
+ Paperwork,
+ )
+ with expect_deprecated_20(with_polymorphic_dep):
+ assert_raises(
+ sa_exc.InvalidRequestError,
+ sess.query(Engineer).with_polymorphic,
+ Boss,
+ )
+ with expect_deprecated_20(with_polymorphic_dep):
+ assert_raises(
+ sa_exc.InvalidRequestError,
+ sess.query(Engineer).with_polymorphic,
+ Person,
+ )
+
+ def test_with_polymorphic_seven(self):
+ sess = fixture_session()
+ # compare to entities without related collections to prevent
+ # additional lazy SQL from firing on loaded entities
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person)
+ .with_polymorphic("*")
+ .order_by(Person.person_id)
+ .all(),
+ self._emps_wo_relationships_fixture(),
+ )
+
+ def test_joinedload_on_subclass(self):
+ sess = fixture_session()
+ expected = [
+ Engineer(
+ name="dilbert",
+ engineer_name="dilbert",
+ primary_language="java",
+ status="regular engineer",
+ machines=[
+ Machine(name="IBM ThinkPad"),
+ Machine(name="IPhone"),
+ ],
+ )
+ ]
+
+ def go():
+ # test load People with joinedload to engineers + machines
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person)
+ .with_polymorphic("*")
+ .options(joinedload(Engineer.machines))
+ .filter(Person.name == "dilbert")
+ .all(),
+ expected,
+ )
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_primary_eager_aliasing_three(self):
+
+ # assert the JOINs don't over JOIN
+
+ sess = fixture_session()
+
+ def go():
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person)
+ .with_polymorphic("*")
+ .order_by(Person.person_id)
+ .options(joinedload(Engineer.machines))[1:3],
+ all_employees[1:3],
+ )
+
+ self.assert_sql_count(testing.db, go, 3)
+
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.scalar(
+ select(func.count("*")).select_from(
+ sess.query(Person)
+ .with_polymorphic("*")
+ .options(joinedload(Engineer.machines))
+ .order_by(Person.person_id)
+ .limit(2)
+ .offset(1)
+ .subquery()
+ )
+ ),
+ 2,
+ )
+
+ def test_join_from_with_polymorphic_nonaliased_one(self):
+ sess = fixture_session()
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person)
+ .with_polymorphic(Manager)
+ .order_by(Person.person_id)
+ .join(Person.paperwork)
+ .filter(Paperwork.description.like("%review%"))
+ .all(),
+ [b1, m1],
+ )
+
+ def test_join_from_with_polymorphic_nonaliased_two(self):
+ sess = fixture_session()
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person)
+ .with_polymorphic([Manager, Engineer])
+ .order_by(Person.person_id)
+ .join(Person.paperwork)
+ .filter(Paperwork.description.like("%#2%"))
+ .all(),
+ [e1, m1],
+ )
+
+ def test_join_from_with_polymorphic_nonaliased_three(self):
+ sess = fixture_session()
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person)
+ .with_polymorphic([Manager, Engineer])
+ .order_by(Person.person_id)
+ .join(Person.paperwork)
+ .filter(Person.name.like("%dog%"))
+ .filter(Paperwork.description.like("%#2%"))
+ .all(),
+ [m1],
+ )
+
+ def test_join_from_with_polymorphic_explicit_aliased_one(self):
+ sess = fixture_session()
+ pa = aliased(Paperwork)
+
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person)
+ .with_polymorphic(Manager)
+ .join(pa, Person.paperwork)
+ .filter(pa.description.like("%review%"))
+ .all(),
+ [b1, m1],
+ )
+
+ def test_join_from_with_polymorphic_explicit_aliased_two(self):
+ sess = fixture_session()
+ pa = aliased(Paperwork)
+
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person)
+ .with_polymorphic([Manager, Engineer])
+ .order_by(Person.person_id)
+ .join(pa, Person.paperwork)
+ .filter(pa.description.like("%#2%"))
+ .all(),
+ [e1, m1],
+ )
+
+ def test_join_from_with_polymorphic_aliased_three(self):
+ sess = fixture_session()
+ pa = aliased(Paperwork)
+
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person)
+ .with_polymorphic([Manager, Engineer])
+ .order_by(Person.person_id)
+ .join(pa, Person.paperwork)
+ .filter(Person.name.like("%dog%"))
+ .filter(pa.description.like("%#2%"))
+ .all(),
+ [m1],
+ )
+
class PolymorphicTest(_PolymorphicTestBase, _Polymorphic):
pass
"companies.company_id = employees_1.company_id "
"AND employees_1.type IN ([POSTCOMPILE_type_1])",
)
+
+
+class SingleOnJoinedTest(fixtures.MappedTest):
+ @classmethod
+ def define_tables(cls, metadata):
+ global persons_table, employees_table
+
+ persons_table = Table(
+ "persons",
+ metadata,
+ Column(
+ "person_id",
+ Integer,
+ primary_key=True,
+ test_needs_autoincrement=True,
+ ),
+ Column("name", String(50)),
+ Column("type", String(20), nullable=False),
+ )
+
+ employees_table = Table(
+ "employees",
+ metadata,
+ Column(
+ "person_id",
+ Integer,
+ ForeignKey("persons.person_id"),
+ primary_key=True,
+ ),
+ Column("employee_data", String(50)),
+ Column("manager_data", String(50)),
+ )
+
+ def test_single_on_joined(self):
+ class Person(fixtures.ComparableEntity):
+ pass
+
+ class Employee(Person):
+ pass
+
+ class Manager(Employee):
+ pass
+
+ self.mapper_registry.map_imperatively(
+ Person,
+ persons_table,
+ polymorphic_on=persons_table.c.type,
+ polymorphic_identity="person",
+ )
+ self.mapper_registry.map_imperatively(
+ Employee,
+ employees_table,
+ inherits=Person,
+ polymorphic_identity="engineer",
+ )
+ self.mapper_registry.map_imperatively(
+ Manager, inherits=Employee, polymorphic_identity="manager"
+ )
+
+ sess = fixture_session()
+ sess.add(Person(name="p1"))
+ sess.add(Employee(name="e1", employee_data="ed1"))
+ sess.add(Manager(name="m1", employee_data="ed2", manager_data="md1"))
+ sess.flush()
+ sess.expunge_all()
+
+ eq_(
+ sess.query(Person).order_by(Person.person_id).all(),
+ [
+ Person(name="p1"),
+ Employee(name="e1", employee_data="ed1"),
+ Manager(name="m1", employee_data="ed2", manager_data="md1"),
+ ],
+ )
+ sess.expunge_all()
+
+ eq_(
+ sess.query(Employee).order_by(Person.person_id).all(),
+ [
+ Employee(name="e1", employee_data="ed1"),
+ Manager(name="m1", employee_data="ed2", manager_data="md1"),
+ ],
+ )
+ sess.expunge_all()
+
+ eq_(
+ sess.query(Manager).order_by(Person.person_id).all(),
+ [Manager(name="m1", employee_data="ed2", manager_data="md1")],
+ )
+ sess.expunge_all()
+
+ def go():
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ sess.query(Person)
+ .with_polymorphic("*")
+ .order_by(Person.person_id)
+ .all(),
+ [
+ Person(name="p1"),
+ Employee(name="e1", employee_data="ed1"),
+ Manager(
+ name="m1", employee_data="ed2", manager_data="md1"
+ ),
+ ],
+ )
+
+ self.assert_sql_count(testing.db, go, 1)
+
+
+class SingleFromPolySelectableTest(
+ fixtures.DeclarativeMappedTest, AssertsCompiledSQL
+):
+ __dialect__ = "default"
+
+ @classmethod
+ def setup_classes(cls, with_polymorphic=None, include_sub_defaults=False):
+ Base = cls.DeclarativeBasic
+
+ class Employee(Base):
+ __tablename__ = "employee"
+ id = Column(Integer, primary_key=True)
+ name = Column(String(50))
+ type = Column(String(50))
+
+ __mapper_args__ = {
+ "polymorphic_identity": "employee",
+ "polymorphic_on": type,
+ }
+
+ class Engineer(Employee):
+ __tablename__ = "engineer"
+ id = Column(Integer, ForeignKey("employee.id"), primary_key=True)
+ engineer_info = Column(String(50))
+ manager_id = Column(ForeignKey("manager.id"))
+ __mapper_args__ = {"polymorphic_identity": "engineer"}
+
+ class Manager(Employee):
+ __tablename__ = "manager"
+ id = Column(Integer, ForeignKey("employee.id"), primary_key=True)
+ manager_data = Column(String(50))
+ __mapper_args__ = {"polymorphic_identity": "manager"}
+
+ class Boss(Manager):
+ __mapper_args__ = {"polymorphic_identity": "boss"}
+
+ def _with_poly_fixture(self):
+ employee = self.classes.Employee.__table__
+ engineer = self.classes.Engineer.__table__
+ manager = self.classes.Manager.__table__
+
+ poly = (
+ select(
+ employee.c.id,
+ employee.c.type,
+ employee.c.name,
+ manager.c.manager_data,
+ null().label("engineer_info"),
+ null().label("manager_id"),
+ )
+ .select_from(employee.join(manager))
+ .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
+ .union_all(
+ select(
+ employee.c.id,
+ employee.c.type,
+ employee.c.name,
+ null().label("manager_data"),
+ engineer.c.engineer_info,
+ engineer.c.manager_id,
+ )
+ .select_from(employee.join(engineer))
+ .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
+ )
+ .alias()
+ )
+
+ return poly
+
+ def test_query_wpoly_single_inh_subclass(self):
+ Boss = self.classes.Boss
+
+ poly = self._with_poly_fixture()
+
+ s = fixture_session()
+
+ with expect_deprecated_20(with_polymorphic_dep):
+ q = s.query(Boss).with_polymorphic(Boss, poly)
+ self.assert_compile(
+ q,
+ "SELECT anon_1.employee_id AS anon_1_employee_id, "
+ "anon_1.employee_name AS anon_1_employee_name, "
+ "anon_1.employee_type AS anon_1_employee_type, "
+ "anon_1.manager_manager_data AS anon_1_manager_manager_data "
+ "FROM (SELECT employee.id AS employee_id, employee.type "
+ "AS employee_type, employee.name AS employee_name, "
+ "manager.manager_data AS manager_manager_data, "
+ "NULL AS engineer_info, NULL AS manager_id FROM employee "
+ "JOIN manager ON employee.id = manager.id "
+ "UNION ALL SELECT employee.id AS employee_id, "
+ "employee.type AS employee_type, employee.name AS employee_name, "
+ "NULL AS manager_data, "
+ "engineer.engineer_info AS engineer_engineer_info, "
+ "engineer.manager_id AS engineer_manager_id "
+ "FROM employee JOIN engineer ON employee.id = engineer.id) "
+ "AS anon_1 WHERE anon_1.employee_type IN ([POSTCOMPILE_type_1])",
+ )
+
+
+class SameNamedPropTwoPolymorphicSubClassesTest(fixtures.MappedTest):
+ """test pathing when two subclasses contain a different property
+ for the same name, and polymorphic loading is used.
+
+ #2614
+
+ """
+
+ run_setup_classes = "once"
+ run_setup_mappers = "once"
+ run_inserts = "once"
+ run_deletes = None
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "a",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("type", String(10)),
+ )
+ Table(
+ "b",
+ metadata,
+ Column("id", Integer, ForeignKey("a.id"), primary_key=True),
+ )
+ Table(
+ "btod",
+ metadata,
+ Column("bid", Integer, ForeignKey("b.id"), nullable=False),
+ Column("did", Integer, ForeignKey("d.id"), nullable=False),
+ )
+ Table(
+ "c",
+ metadata,
+ Column("id", Integer, ForeignKey("a.id"), primary_key=True),
+ )
+ Table(
+ "ctod",
+ metadata,
+ Column("cid", Integer, ForeignKey("c.id"), nullable=False),
+ Column("did", Integer, ForeignKey("d.id"), nullable=False),
+ )
+ Table(
+ "d",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ )
+
+ @classmethod
+ def setup_classes(cls):
+ class A(cls.Comparable):
+ pass
+
+ class B(A):
+ pass
+
+ class C(A):
+ pass
+
+ class D(cls.Comparable):
+ pass
+
+ @classmethod
+ def setup_mappers(cls):
+ A = cls.classes.A
+ B = cls.classes.B
+ C = cls.classes.C
+ D = cls.classes.D
+
+ cls.mapper_registry.map_imperatively(
+ A, cls.tables.a, polymorphic_on=cls.tables.a.c.type
+ )
+ cls.mapper_registry.map_imperatively(
+ B,
+ cls.tables.b,
+ inherits=A,
+ polymorphic_identity="b",
+ properties={"related": relationship(D, secondary=cls.tables.btod)},
+ )
+ cls.mapper_registry.map_imperatively(
+ C,
+ cls.tables.c,
+ inherits=A,
+ polymorphic_identity="c",
+ properties={"related": relationship(D, secondary=cls.tables.ctod)},
+ )
+ cls.mapper_registry.map_imperatively(D, cls.tables.d)
+
+ @classmethod
+ def insert_data(cls, connection):
+ B = cls.classes.B
+ C = cls.classes.C
+ D = cls.classes.D
+
+ session = Session(connection)
+
+ d = D()
+ session.add_all([B(related=[d]), C(related=[d])])
+ session.commit()
+
+ def test_fixed_w_poly_subquery(self):
+ A = self.classes.A
+ B = self.classes.B
+ C = self.classes.C
+ D = self.classes.D
+
+ session = fixture_session()
+ d = session.query(D).one()
+
+ def go():
+ # NOTE: subqueryload is broken for this case, first found
+ # when cartesian product detection was added.
+ with expect_deprecated_20(with_polymorphic_dep):
+ for a in (
+ session.query(A)
+ .with_polymorphic([B, C])
+ .options(selectinload(B.related), selectinload(C.related))
+ ):
+ eq_(a.related, [d])
+
+ self.assert_sql_count(testing.db, go, 3)
+
+ def test_fixed_w_poly_joined(self):
+ A = self.classes.A
+ B = self.classes.B
+ C = self.classes.C
+ D = self.classes.D
+
+ session = fixture_session()
+ d = session.query(D).one()
+
+ def go():
+ with expect_deprecated_20(with_polymorphic_dep):
+ for a in (
+ session.query(A)
+ .with_polymorphic([B, C])
+ .options(joinedload(B.related), joinedload(C.related))
+ ):
+ eq_(a.related, [d])
+
+ self.assert_sql_count(testing.db, go, 1)
+
+
+class ConcreteTest(fixtures.MappedTest):
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "companies",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("name", String(50)),
+ )
+ Table(
+ "employees",
+ metadata,
+ Column(
+ "employee_id",
+ Integer,
+ primary_key=True,
+ test_needs_autoincrement=True,
+ ),
+ Column("name", String(50)),
+ Column("company_id", Integer, ForeignKey("companies.id")),
+ )
+ Table(
+ "managers",
+ metadata,
+ Column(
+ "employee_id",
+ Integer,
+ primary_key=True,
+ test_needs_autoincrement=True,
+ ),
+ Column("name", String(50)),
+ Column("manager_data", String(50)),
+ Column("company_id", Integer, ForeignKey("companies.id")),
+ )
+ Table(
+ "engineers",
+ metadata,
+ Column(
+ "employee_id",
+ Integer,
+ primary_key=True,
+ test_needs_autoincrement=True,
+ ),
+ Column("name", String(50)),
+ Column("engineer_info", String(50)),
+ Column("company_id", Integer, ForeignKey("companies.id")),
+ )
+ Table(
+ "hackers",
+ metadata,
+ Column(
+ "employee_id",
+ Integer,
+ primary_key=True,
+ test_needs_autoincrement=True,
+ ),
+ Column("name", String(50)),
+ Column("engineer_info", String(50)),
+ Column("company_id", Integer, ForeignKey("companies.id")),
+ Column("nickname", String(50)),
+ )
+
+ @classmethod
+ def setup_classes(cls):
+ class Employee(cls.Basic):
+ def __init__(self, name):
+ self.name = name
+
+ def __repr__(self):
+ return self.__class__.__name__ + " " + self.name
+
+ class Manager(Employee):
+ def __init__(self, name, manager_data):
+ self.name = name
+ self.manager_data = manager_data
+
+ def __repr__(self):
+ return (
+ self.__class__.__name__
+ + " "
+ + self.name
+ + " "
+ + self.manager_data
+ )
+
+ class Engineer(Employee):
+ def __init__(self, name, engineer_info):
+ self.name = name
+ self.engineer_info = engineer_info
+
+ def __repr__(self):
+ return (
+ self.__class__.__name__
+ + " "
+ + self.name
+ + " "
+ + self.engineer_info
+ )
+
+ class Hacker(Engineer):
+ def __init__(self, name, nickname, engineer_info):
+ self.name = name
+ self.nickname = nickname
+ self.engineer_info = engineer_info
+
+ def __repr__(self):
+ return (
+ self.__class__.__name__
+ + " "
+ + self.name
+ + " '"
+ + self.nickname
+ + "' "
+ + self.engineer_info
+ )
+
+ class Company(cls.Basic):
+ pass
+
+ def test_without_default_polymorphic(self):
+ Employee, Engineer, Manager = self.classes(
+ "Employee", "Engineer", "Manager"
+ )
+ (Hacker,) = self.classes("Hacker")
+ (employees_table,) = self.tables("employees")
+ engineers_table, managers_table = self.tables("engineers", "managers")
+ (hackers_table,) = self.tables("hackers")
+
+ pjoin = polymorphic_union(
+ {
+ "employee": employees_table,
+ "manager": managers_table,
+ "engineer": engineers_table,
+ "hacker": hackers_table,
+ },
+ "type",
+ "pjoin",
+ )
+ pjoin2 = polymorphic_union(
+ {"engineer": engineers_table, "hacker": hackers_table},
+ "type",
+ "pjoin2",
+ )
+ employee_mapper = self.mapper_registry.map_imperatively(
+ Employee, employees_table, polymorphic_identity="employee"
+ )
+ self.mapper_registry.map_imperatively(
+ Manager,
+ managers_table,
+ inherits=employee_mapper,
+ concrete=True,
+ polymorphic_identity="manager",
+ )
+ engineer_mapper = self.mapper_registry.map_imperatively(
+ Engineer,
+ engineers_table,
+ inherits=employee_mapper,
+ concrete=True,
+ polymorphic_identity="engineer",
+ )
+ self.mapper_registry.map_imperatively(
+ Hacker,
+ hackers_table,
+ inherits=engineer_mapper,
+ concrete=True,
+ polymorphic_identity="hacker",
+ )
+ session = fixture_session()
+ jdoe = Employee("Jdoe")
+ tom = Manager("Tom", "knows how to manage things")
+ jerry = Engineer("Jerry", "knows how to program")
+ hacker = Hacker("Kurt", "Badass", "knows how to hack")
+ session.add_all((jdoe, tom, jerry, hacker))
+ session.flush()
+
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ len(
+ session.connection()
+ .execute(
+ session.query(Employee)
+ .with_polymorphic("*", pjoin, pjoin.c.type)
+ .statement
+ )
+ .fetchall()
+ ),
+ 4,
+ )
+ eq_(session.get(Employee, jdoe.employee_id), jdoe)
+ eq_(session.get(Engineer, jerry.employee_id), jerry)
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ set(
+ [
+ repr(x)
+ for x in session.query(Employee).with_polymorphic(
+ "*", pjoin, pjoin.c.type
+ )
+ ]
+ ),
+ set(
+ [
+ "Employee Jdoe",
+ "Engineer Jerry knows how to program",
+ "Manager Tom knows how to manage things",
+ "Hacker Kurt 'Badass' knows how to hack",
+ ]
+ ),
+ )
+ eq_(
+ set([repr(x) for x in session.query(Manager)]),
+ set(["Manager Tom knows how to manage things"]),
+ )
+ with expect_deprecated_20(with_polymorphic_dep):
+ eq_(
+ set(
+ [
+ repr(x)
+ for x in session.query(Engineer).with_polymorphic(
+ "*", pjoin2, pjoin2.c.type
+ )
+ ]
+ ),
+ set(
+ [
+ "Engineer Jerry knows how to program",
+ "Hacker Kurt 'Badass' knows how to hack",
+ ]
+ ),
+ )
+ eq_(
+ set([repr(x) for x in session.query(Hacker)]),
+ set(["Hacker Kurt 'Badass' knows how to hack"]),
+ )
+
+ # test adaption of the column by wrapping the query in a
+ # subquery
+
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) method", with_polymorphic_dep
+ ):
+ eq_(
+ len(
+ session.connection()
+ .execute(
+ session.query(Engineer)
+ .with_polymorphic("*", pjoin2, pjoin2.c.type)
+ .from_self()
+ .statement
+ )
+ .fetchall()
+ ),
+ 2,
+ )
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) method", with_polymorphic_dep
+ ):
+ eq_(
+ set(
+ [
+ repr(x)
+ for x in session.query(Engineer)
+ .with_polymorphic("*", pjoin2, pjoin2.c.type)
+ .from_self()
+ ]
+ ),
+ set(
+ [
+ "Engineer Jerry knows how to program",
+ "Hacker Kurt 'Badass' knows how to hack",
+ ]
+ ),
+ )
count = {"": 14, "Polymorphic": 7}.get(self.select_type, 8)
self.assert_sql_count(testing.db, go, count)
- def test_primary_eager_aliasing_three(self):
+ def test_primary_eager_aliasing_three_reset_selectable(self):
+ """test now related to #7262
+ See test_primary_eager_aliasing_three_dont_reset_selectable for the
+ non-reset selectable version.
+
+ """
# assert the JOINs don't over JOIN
sess = fixture_session()
+ # note selectable=None
+ wp = with_polymorphic(Person, "*", None)
+
def go():
eq_(
- sess.query(Person)
- .with_polymorphic("*")
- .order_by(Person.person_id)
- .options(joinedload(Engineer.machines))[1:3],
+ sess.query(wp)
+ .order_by(wp.person_id)
+ .options(joinedload(wp.Engineer.machines))[1:3],
all_employees[1:3],
)
eq_(
sess.scalar(
select(func.count("*")).select_from(
- sess.query(Person)
- .with_polymorphic("*")
- .options(joinedload(Engineer.machines))
- .order_by(Person.person_id)
+ sess.query(wp)
+ .options(joinedload(wp.Engineer.machines))
+ .order_by(wp.person_id)
.limit(2)
.offset(1)
.subquery()
[m1],
)
- def test_join_from_with_polymorphic_nonaliased_one(self):
- sess = fixture_session()
- eq_(
- sess.query(Person)
- .with_polymorphic(Manager)
- .order_by(Person.person_id)
- .join(Person.paperwork)
- .filter(Paperwork.description.like("%review%"))
- .all(),
- [b1, m1],
- )
-
def test_join_from_with_polymorphic_nonaliased_one_future(self):
sess = fixture_session(future=True)
[b1, m1],
)
- def test_join_from_with_polymorphic_nonaliased_two(self):
+ def test_join_from_with_polymorphic_nonaliased_two_future(self):
sess = fixture_session()
+
+ wp = with_polymorphic(Person, [Manager, Engineer])
eq_(
- sess.query(Person)
- .with_polymorphic([Manager, Engineer])
- .order_by(Person.person_id)
- .join(Person.paperwork)
+ sess.query(wp)
+ .order_by(wp.person_id)
+ .join(wp.paperwork)
.filter(Paperwork.description.like("%#2%"))
.all(),
[e1, m1],
)
- def test_join_from_with_polymorphic_nonaliased_three(self):
+ def test_join_from_with_polymorphic_nonaliased_three_future(self):
sess = fixture_session()
+
+ wp = with_polymorphic(Person, [Manager, Engineer])
eq_(
- sess.query(Person)
- .with_polymorphic([Manager, Engineer])
- .order_by(Person.person_id)
- .join(Person.paperwork)
- .filter(Person.name.like("%dog%"))
+ sess.query(wp)
+ .order_by(wp.person_id)
+ .join(wp.paperwork)
+ .filter(wp.name.like("%dog%"))
.filter(Paperwork.description.like("%#2%"))
.all(),
[m1],
)
- def test_join_from_with_polymorphic_explicit_aliased_one(self):
+ def test_join_from_with_polymorphic_explicit_aliased_one_future(self):
sess = fixture_session()
pa = aliased(Paperwork)
+ wp = with_polymorphic(Person, [Manager])
+
eq_(
- sess.query(Person)
- .with_polymorphic(Manager)
- .join(pa, Person.paperwork)
+ sess.query(wp)
+ .join(pa, wp.paperwork)
.filter(pa.description.like("%review%"))
.all(),
[b1, m1],
)
- def test_join_from_with_polymorphic_explicit_aliased_two(self):
+ def test_join_from_with_polymorphic_explicit_aliased_two_future(self):
sess = fixture_session()
pa = aliased(Paperwork)
+
+ wp = with_polymorphic(Person, [Manager, Engineer])
eq_(
- sess.query(Person)
- .with_polymorphic([Manager, Engineer])
- .order_by(Person.person_id)
- .join(pa, Person.paperwork)
+ sess.query(wp)
+ .order_by(wp.person_id)
+ .join(pa, wp.paperwork)
.filter(pa.description.like("%#2%"))
.all(),
[e1, m1],
)
- def test_join_from_with_polymorphic_aliased_three(self):
+ def test_join_from_with_polymorphic_ot_explicit_aliased_two_future(self):
sess = fixture_session()
pa = aliased(Paperwork)
+ wp = with_polymorphic(Person, [Manager, Engineer])
eq_(
- sess.query(Person)
- .with_polymorphic([Manager, Engineer])
- .order_by(Person.person_id)
- .join(pa, Person.paperwork)
- .filter(Person.name.like("%dog%"))
+ sess.query(wp)
+ .order_by(wp.person_id)
+ .join(wp.paperwork.of_type(pa))
+ .filter(pa.description.like("%#2%"))
+ .all(),
+ [e1, m1],
+ )
+
+ def test_join_from_with_polymorphic_aliased_three_future(self):
+ sess = fixture_session()
+ pa = aliased(Paperwork)
+ wp = with_polymorphic(Person, [Manager, Engineer])
+
+ eq_(
+ sess.query(wp)
+ .order_by(wp.person_id)
+ .join(pa, wp.paperwork)
+ .filter(wp.name.like("%dog%"))
+ .filter(pa.description.like("%#2%"))
+ .all(),
+ [m1],
+ )
+
+ def test_join_from_with_polymorphic_ot_aliased_three_future(self):
+ sess = fixture_session()
+ pa = aliased(Paperwork)
+ wp = with_polymorphic(Person, [Manager, Engineer])
+
+ eq_(
+ sess.query(wp)
+ .order_by(wp.person_id)
+ .join(wp.paperwork.of_type(pa))
+ .filter(wp.name.like("%dog%"))
.filter(pa.description.like("%#2%"))
.all(),
[m1],
sess.expire(m2, ["manager_name", "golf_swing"])
assert m2.golf_swing == "fore"
- def test_with_polymorphic_one(self):
+ def test_with_polymorphic_one_future(self):
sess = fixture_session()
def go():
+ wp = with_polymorphic(Person, [Engineer])
eq_(
- sess.query(Person)
- .with_polymorphic(Engineer)
- .filter(Engineer.primary_language == "java")
+ sess.query(wp)
+ .filter(wp.Engineer.primary_language == "java")
.all(),
self._emps_wo_relationships_fixture()[0:1],
)
self.assert_sql_count(testing.db, go, 1)
- def test_with_polymorphic_two(self):
+ def test_with_polymorphic_two_future_adhoc_wp(self):
+ """test #7262
+
+ compare to
+ test_with_polymorphic_two_future_default_wp
+
+ """
+
sess = fixture_session()
def go():
+
+ wp = with_polymorphic(Person, "*", selectable=None)
eq_(
- sess.query(Person)
- .with_polymorphic("*")
- .order_by(Person.person_id)
- .all(),
+ sess.query(wp).order_by(wp.person_id).all(),
self._emps_wo_relationships_fixture(),
)
self.assert_sql_count(testing.db, go, 1)
- def test_with_polymorphic_three(self):
+ def test_with_polymorphic_three_future(self):
sess = fixture_session()
def go():
+ wp = with_polymorphic(Person, [Engineer])
+
eq_(
- sess.query(Person)
- .with_polymorphic(Engineer)
- .order_by(Person.person_id)
- .all(),
+ sess.query(wp).order_by(wp.person_id).all(),
self._emps_wo_relationships_fixture(),
)
self.assert_sql_count(testing.db, go, 3)
- def test_with_polymorphic_four(self):
+ def test_with_polymorphic_four_future(self):
sess = fixture_session()
def go():
+ wp = with_polymorphic(
+ Person, Engineer, selectable=people.outerjoin(engineers)
+ )
eq_(
- sess.query(Person)
- .with_polymorphic(Engineer, people.outerjoin(engineers))
- .order_by(Person.person_id)
- .all(),
+ sess.query(wp).order_by(wp.person_id).all(),
self._emps_wo_relationships_fixture(),
)
self.assert_sql_count(testing.db, go, 3)
- def test_with_polymorphic_five(self):
+ def test_with_polymorphic_five_future_override_selectable(self):
+ """test part of #7262
+
+ this is kind of a hack though, people wouldn't know to do this
+ this way.
+
+ """
sess = fixture_session()
def go():
+ # needs both [Person] and the selectable=None part
+ # TODO: why do we need [Person] and can't send []? possible
+ # bug
+ wp = with_polymorphic(Person, [Person], selectable=None)
+
# limit the polymorphic join down to just "Person",
# overriding select_table
eq_(
- sess.query(Person).with_polymorphic(Person).all(),
+ sess.query(wp).all(),
self._emps_wo_relationships_fixture(),
)
self.assert_sql_count(testing.db, go, 6)
- def test_with_polymorphic_six(self):
- sess = fixture_session()
-
+ def test_with_polymorphic_six_future(self):
assert_raises(
- sa_exc.InvalidRequestError,
- sess.query(Person).with_polymorphic,
- Paperwork,
+ sa_exc.InvalidRequestError, with_polymorphic, Person, [Paperwork]
)
assert_raises(
- sa_exc.InvalidRequestError,
- sess.query(Engineer).with_polymorphic,
- Boss,
+ sa_exc.InvalidRequestError, with_polymorphic, Engineer, [Boss]
)
assert_raises(
- sa_exc.InvalidRequestError,
- sess.query(Engineer).with_polymorphic,
- Person,
+ sa_exc.InvalidRequestError, with_polymorphic, Engineer, [Person]
)
- def test_with_polymorphic_seven(self):
+ def test_with_polymorphic_seven_future(self):
sess = fixture_session()
# compare to entities without related collections to prevent
# additional lazy SQL from firing on loaded entities
+ wp = with_polymorphic(Person, "*")
eq_(
- sess.query(Person)
- .with_polymorphic("*")
- .order_by(Person.person_id)
- .all(),
+ sess.query(wp).order_by(wp.person_id).all(),
self._emps_wo_relationships_fixture(),
)
def go():
# test load People with joinedload to engineers + machines
+ wp = with_polymorphic(Person, "*")
eq_(
- sess.query(Person)
- .with_polymorphic("*")
- .options(joinedload(Engineer.machines))
- .filter(Person.name == "dilbert")
+ sess.query(wp)
+ .options(joinedload(wp.Engineer.machines))
+ .filter(wp.name == "dilbert")
.all(),
expected,
)
expected,
)
- # the old version of this test has never worked, apparently,
- # was always spitting out a cartesian product. Since we
- # are getting rid of query.with_polymorphic() is it not
- # worth fixing.
- # eq_(
- # sess.query(Person)
- # .with_polymorphic("*")
- # .options(subqueryload(Engineer.machines))
- # .filter(Person.name == "dilbert")
- # .all(),
- # expected,
- # )
-
self.assert_sql_count(testing.db, go, 2)
def test_query_subclass_join_to_base_relationship(self):
class PolymorphicTest(_PolymorphicTestBase, _Polymorphic):
+ def test_primary_eager_aliasing_three_dont_reset_selectable(self):
+ """test now related to #7262
+
+ See test_primary_eager_aliasing_three_reset_selectable for
+ the reset selectable version.
+
+ """
+ # assert the JOINs don't over JOIN
+
+ sess = fixture_session()
+
+ # selectable default is False
+ wp = with_polymorphic(Person, "*")
+
+ def go():
+ eq_(
+ sess.query(wp)
+ .order_by(wp.person_id)
+ .options(joinedload(wp.Engineer.machines))[1:3],
+ all_employees[1:3],
+ )
+
+ self.assert_sql_count(testing.db, go, 3)
+
+ eq_(
+ sess.scalar(
+ select(func.count("*")).select_from(
+ sess.query(wp)
+ .options(joinedload(wp.Engineer.machines))
+ .order_by(wp.person_id)
+ .limit(2)
+ .offset(1)
+ .subquery()
+ )
+ ),
+ 2,
+ )
+
+ def test_with_polymorphic_two_future_default_wp(self):
+ """test #7262
+
+ compare to
+ test_with_polymorphic_two_future_adhoc_wp
+
+ """
+ sess = fixture_session()
+
+ def go():
+
+ wp = with_polymorphic(Person, "*")
+ eq_(
+ sess.query(wp).order_by(wp.person_id).all(),
+ self._emps_wo_relationships_fixture(),
+ )
+
+ self.assert_sql_count(testing.db, go, 1)
+
def test_join_to_subclass_four(self):
sess = fixture_session()
eq_(
):
__dialect__ = "default"
+ def test_with_polymorphic_two_future_default_wp(self):
+ """test #7262
+
+ compare to
+ test_with_polymorphic_two_future_adhoc_wp
+
+ """
+ sess = fixture_session()
+
+ def go():
+
+ wp = with_polymorphic(Person, "*")
+ eq_(
+ sess.query(wp).order_by(wp.person_id).all(),
+ self._emps_wo_relationships_fixture(),
+ )
+
+ self.assert_sql_count(testing.db, go, 1)
+
def test_aliased_not_polluted_by_join(self):
# aliased(polymorphic) will normally do the old-school
# "(SELECT * FROM a JOIN b ...) AS anon_1" thing.
class PolymorphicUnionsTest(_PolymorphicTestBase, _PolymorphicUnions):
+ def test_with_polymorphic_two_future_default_wp(self):
+ """test #7262
+
+ compare to
+ test_with_polymorphic_two_future_adhoc_wp
+
+ """
+ sess = fixture_session()
+
+ def go():
+
+ wp = with_polymorphic(Person, "*")
+ eq_(
+ sess.query(wp).order_by(wp.person_id).all(),
+ self._emps_wo_relationships_fixture(),
+ )
+
+ self.assert_sql_count(testing.db, go, 2)
+
def test_subqueryload_on_subclass_uses_path_correctly(self):
sess = fixture_session()
expected = [
class PolymorphicAliasedJoinsTest(
_PolymorphicTestBase, _PolymorphicAliasedJoins
):
- pass
+ def test_with_polymorphic_two_future_default_wp(self):
+ """test #7262
+
+ compare to
+ test_with_polymorphic_two_future_adhoc_wp
+
+ """
+ sess = fixture_session()
+
+ def go():
+
+ wp = with_polymorphic(Person, "*")
+ eq_(
+ sess.query(wp).order_by(wp.person_id).all(),
+ self._emps_wo_relationships_fixture(),
+ )
+
+ self.assert_sql_count(testing.db, go, 2)
class PolymorphicJoinsTest(_PolymorphicTestBase, _PolymorphicJoins):
+ def test_with_polymorphic_two_future_default_wp(self):
+ """test #7262
+
+ compare to
+ test_with_polymorphic_two_future_adhoc_wp
+
+ """
+ sess = fixture_session()
+
+ def go():
+
+ wp = with_polymorphic(Person, "*")
+ eq_(
+ sess.query(wp).order_by(wp.person_id).all(),
+ self._emps_wo_relationships_fixture(),
+ )
+
+ self.assert_sql_count(testing.db, go, 2)
+
def test_having_group_by(self):
sess = fixture_session()
eq_(
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import relationship
-from sqlalchemy.orm import selectinload
from sqlalchemy.orm import Session
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import subqueryload
self.assert_sql_count(testing.db, go, 3)
- def test_fixed_w_poly_subquery(self):
- A = self.classes.A
- B = self.classes.B
- C = self.classes.C
- D = self.classes.D
-
- session = fixture_session()
- d = session.query(D).one()
-
- def go():
- # NOTE: subqueryload is broken for this case, first found
- # when cartesian product detection was added.
- for a in (
- session.query(A)
- .with_polymorphic([B, C])
- .options(selectinload(B.related), selectinload(C.related))
- ):
- eq_(a.related, [d])
-
- self.assert_sql_count(testing.db, go, 3)
-
def test_free_w_poly_joined(self):
A = self.classes.A
B = self.classes.B
self.assert_sql_count(testing.db, go, 1)
- def test_fixed_w_poly_joined(self):
- A = self.classes.A
- B = self.classes.B
- C = self.classes.C
- D = self.classes.D
-
- session = fixture_session()
- d = session.query(D).one()
-
- def go():
- for a in (
- session.query(A)
- .with_polymorphic([B, C])
- .options(joinedload(B.related), joinedload(C.related))
- ):
- eq_(a.related, [d])
-
- self.assert_sql_count(testing.db, go, 1)
-
class SubClassToSubClassFromParentTest(fixtures.MappedTest):
"""test #2617"""
sess.expunge_all()
def go():
+ wp = with_polymorphic(Person, "*")
eq_(
- sess.query(Person)
- .with_polymorphic("*")
- .order_by(Person.person_id)
- .all(),
+ sess.query(wp).order_by(wp.person_id).all(),
[
Person(name="p1"),
Employee(name="e1", employee_data="ed1"),
poly = self._with_poly_fixture()
s = fixture_session()
- q = s.query(Boss).with_polymorphic(Boss, poly)
+
+ wp = with_polymorphic(Boss, [], poly)
+ q = s.query(wp)
self.assert_compile(
q,
"SELECT anon_1.employee_id AS anon_1_employee_id, "
"Person", "Manager", "Engineer", "Boss"
)
- def one():
- return (
- fixture_session()
- .query(Person)
- .with_polymorphic([Manager, Engineer])
- )
-
def two():
wp = with_polymorphic(Person, [Manager, Engineer])
return fixture_session().query(wp).filter(wp.name == "asdfo")
- def four():
- return (
- fixture_session()
- .query(Person)
- .with_polymorphic([Manager, Engineer])
- .filter(Person.name == "asdf")
- )
-
def five():
subq = (
select(Person)
return fixture_session().query(wp).filter(wp.name == "asdfo")
- def six():
- subq = (
- select(Person)
- .outerjoin(Manager)
- .outerjoin(Engineer)
- .subquery()
- )
-
- return (
- fixture_session()
- .query(Person)
- .with_polymorphic([Manager, Engineer], subq)
- .filter(Person.name == "asdfo")
- )
-
self._run_cache_key_fixture(
- lambda: stmt_20(
- one(), two(), three(), three_a(), four(), five(), six()
- ),
+ lambda: stmt_20(two(), three(), three_a(), five()),
compare_values=True,
)
from sqlalchemy import true
from sqlalchemy import util
from sqlalchemy.engine import default
+from sqlalchemy.engine.base import Engine
from sqlalchemy.orm import aliased
from sqlalchemy.orm import as_declarative
from sqlalchemy.orm import attributes
"arguments in SQLAlchemy 2.0."
)
+autocommit_dep = (
+ "The Session.autocommit parameter is deprecated "
+ "and will be removed in SQLAlchemy version 2.0."
+)
+
+subtransactions_dep = (
+ "The Session.begin.subtransactions flag is deprecated "
+ "and will be removed in SQLAlchemy version 2.0."
+)
opt_strings_dep = (
"Using strings to indicate column or relationship "
"paths in loader options"
"legacy as of the 1.x"
)
+with_polymorphic_dep = (
+ r"The Query.with_polymorphic\(\) method is considered legacy as of "
+ r"the 1.x series of SQLAlchemy and will be removed in 2.0"
+)
+
def _aliased_join_warning(arg=None):
return testing.expect_warnings(
s1 = Session(testing.db)
s1.begin()
- with testing.expect_deprecated_20(
- "The Session.begin.subtransactions flag is deprecated "
- "and will be removed in SQLAlchemy version 2.0."
- ):
+ with testing.expect_deprecated_20(subtransactions_dep):
s1.begin(subtransactions=True)
s1.close()
def test_autocommit_deprecated(Self):
- with testing.expect_deprecated_20(
- "The Session.autocommit parameter is deprecated "
- "and will be removed in SQLAlchemy version 2.0."
- ):
+ with testing.expect_deprecated_20(autocommit_dep):
Session(autocommit=True)
@testing.combinations(
eq_(sess.query(User).count(), 1)
+class TransScopingTest(_fixtures.FixtureTest):
+ run_inserts = None
+ __prefer_requires__ = ("independent_connections",)
+
+ @testing.combinations((True,), (False,), argnames="begin")
+ @testing.combinations((True,), (False,), argnames="expire_on_commit")
+ @testing.combinations((True,), (False,), argnames="modify_unconditional")
+ @testing.combinations(
+ ("nothing",), ("modify",), ("add",), ("delete",), argnames="case_"
+ )
+ def test_autobegin_attr_change(
+ self, case_, begin, modify_unconditional, expire_on_commit
+ ):
+ """test :ticket:`6360`"""
+
+ autocommit = True
+ User, users = self.classes.User, self.tables.users
+
+ self.mapper_registry.map_imperatively(User, users)
+
+ with testing.expect_deprecated_20(autocommit_dep):
+ s = Session(
+ testing.db,
+ autocommit=autocommit,
+ expire_on_commit=expire_on_commit,
+ )
+
+ u = User(name="x")
+ u2 = User(name="d")
+ u3 = User(name="e")
+ s.add_all([u, u2, u3])
+
+ if autocommit:
+ s.flush()
+ else:
+ s.commit()
+
+ if begin:
+ s.begin()
+
+ if case_ == "add":
+ # this autobegins
+ s.add(User(name="q"))
+ elif case_ == "delete":
+ # this autobegins
+ s.delete(u2)
+ elif case_ == "modify":
+ # this autobegins
+ u3.name = "m"
+
+ if case_ == "nothing" and not begin:
+ assert not s._transaction
+ expect_expire = expire_on_commit
+ elif autocommit and not begin:
+ assert not s._transaction
+ expect_expire = expire_on_commit
+ else:
+ assert s._transaction
+ expect_expire = True
+
+ if modify_unconditional:
+ # this autobegins
+ u.name = "y"
+ expect_expire = True
+
+ if not expect_expire:
+ assert not s._transaction
+
+ # test is that state is consistent after rollback()
+ s.rollback()
+
+ if autocommit and not begin and modify_unconditional:
+ eq_(u.name, "y")
+ else:
+ if not expect_expire:
+ assert "name" in u.__dict__
+ else:
+ assert "name" not in u.__dict__
+ eq_(u.name, "x")
+
+ def test_no_autoflush_or_commit_in_expire_w_autocommit(self):
+ """test second part of :ticket:`6233`.
+
+ Here we test that the "autoflush on unexpire" feature added
+ in :ticket:`5226` is turned off for a legacy autocommit session.
+
+ """
+
+ with testing.expect_deprecated_20(autocommit_dep):
+ s = Session(
+ testing.db,
+ autocommit=True,
+ expire_on_commit=True,
+ autoflush=True,
+ )
+
+ User, users = self.classes.User, self.tables.users
+
+ self.mapper_registry.map_imperatively(User, users)
+
+ u1 = User(name="u1")
+ s.add(u1)
+ s.flush() # this commits
+
+ u1.name = "u2" # this does not commit
+
+ assert "id" not in u1.__dict__
+ u1.id # this unexpires
+
+ # never expired
+ eq_(u1.__dict__["name"], "u2")
+
+ eq_(u1.name, "u2")
+
+ # still in dirty collection
+ assert u1 in s.dirty
+
+
class AutocommitClosesOnFailTest(fixtures.MappedTest):
__requires__ = ("deferrable_fks",)
def test_close_transaction_on_commit_fail(self):
T2 = self.classes.T2
- session = Session(testing.db, autocommit=True)
+ with testing.expect_deprecated_20(autocommit_dep):
+ session = Session(testing.db, autocommit=True)
# with a deferred constraint, this fails at COMMIT time instead
# of at INSERT time.
)
+class AutoCommitTest(_LocalFixture):
+ __backend__ = True
+
+ def test_begin_nested_requires_trans(self):
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True)
+ assert_raises(sa_exc.InvalidRequestError, sess.begin_nested)
+
+ def test_begin_preflush(self):
+ User = self.classes.User
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True)
+
+ u1 = User(name="ed")
+ sess.add(u1)
+
+ sess.begin()
+ u2 = User(name="some other user")
+ sess.add(u2)
+ sess.rollback()
+ assert u2 not in sess
+ assert u1 in sess
+ assert sess.query(User).filter_by(name="ed").one() is u1
+
+ def test_accounting_commit_fails_add(self):
+ User = self.classes.User
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True)
+
+ fail = False
+
+ def fail_fn(*arg, **kw):
+ if fail:
+ raise Exception("commit fails")
+
+ event.listen(sess, "after_flush_postexec", fail_fn)
+ u1 = User(name="ed")
+ sess.add(u1)
+
+ fail = True
+ assert_raises(Exception, sess.flush)
+ fail = False
+
+ assert u1 not in sess
+ u1new = User(id=2, name="fred")
+ sess.add(u1new)
+ sess.add(u1)
+ sess.flush()
+ assert u1 in sess
+ eq_(
+ sess.query(User.name).order_by(User.name).all(),
+ [("ed",), ("fred",)],
+ )
+
+ def test_accounting_commit_fails_delete(self):
+ User = self.classes.User
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True)
+
+ fail = False
+
+ def fail_fn(*arg, **kw):
+ if fail:
+ raise Exception("commit fails")
+
+ event.listen(sess, "after_flush_postexec", fail_fn)
+ u1 = User(name="ed")
+ sess.add(u1)
+ sess.flush()
+
+ sess.delete(u1)
+ fail = True
+ assert_raises(Exception, sess.flush)
+ fail = False
+
+ assert u1 in sess
+ assert u1 not in sess.deleted
+ sess.delete(u1)
+ sess.flush()
+ assert u1 not in sess
+ eq_(sess.query(User.name).order_by(User.name).all(), [])
+
+ @testing.requires.updateable_autoincrement_pks
+ def test_accounting_no_select_needed(self):
+ """test that flush accounting works on non-expired instances
+ when autocommit=True/expire_on_commit=True."""
+
+ User = self.classes.User
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True, expire_on_commit=True)
+
+ u1 = User(id=1, name="ed")
+ sess.add(u1)
+ sess.flush()
+
+ u1.id = 3
+ u1.name = "fred"
+ self.assert_sql_count(testing.db, sess.flush, 1)
+ assert "id" not in u1.__dict__
+ eq_(u1.id, 3)
+
+
+class SessionStateTest(_fixtures.FixtureTest):
+ run_inserts = None
+
+ __prefer_requires__ = ("independent_connections",)
+
+ def test_autocommit_doesnt_raise_on_pending(self):
+ User, users = self.classes.User, self.tables.users
+
+ self.mapper_registry.map_imperatively(User, users)
+ with assertions.expect_deprecated_20(autocommit_dep):
+ session = Session(testing.db, autocommit=True)
+
+ session.add(User(name="ed"))
+
+ session.begin()
+ session.flush()
+ session.commit()
+
+
+class SessionTransactionTest(fixtures.RemovesEvents, _fixtures.FixtureTest):
+ run_inserts = None
+ __backend__ = True
+
+ @testing.fixture
+ def conn(self):
+ with testing.db.connect() as conn:
+ yield conn
+
+ @testing.fixture
+ def future_conn(self):
+
+ engine = Engine._future_facade(testing.db)
+ with engine.connect() as conn:
+ yield conn
+
+ def test_deactive_status_check(self):
+ sess = fixture_session()
+ trans = sess.begin()
+
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ trans2 = sess.begin(subtransactions=True)
+ trans2.rollback()
+ assert_raises_message(
+ sa_exc.InvalidRequestError,
+ "This session is in 'inactive' state, due to the SQL transaction "
+ "being rolled back; no further SQL can be emitted within this "
+ "transaction.",
+ trans.commit,
+ )
+
+ def test_deactive_status_check_w_exception(self):
+ sess = fixture_session()
+ trans = sess.begin()
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ trans2 = sess.begin(subtransactions=True)
+ try:
+ raise Exception("test")
+ except Exception:
+ trans2.rollback(_capture_exception=True)
+ assert_raises_message(
+ sa_exc.PendingRollbackError,
+ r"This Session's transaction has been rolled back due to a "
+ r"previous exception during flush. To begin a new transaction "
+ r"with this Session, first issue Session.rollback\(\). "
+ r"Original exception was: test",
+ trans.commit,
+ )
+
+ def test_error_on_using_inactive_session_commands(self):
+ users, User = self.tables.users, self.classes.User
+
+ self.mapper_registry.map_imperatively(User, users)
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True)
+ sess.begin()
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ sess.begin(subtransactions=True)
+ sess.add(User(name="u1"))
+ sess.flush()
+ sess.rollback()
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ assert_raises_message(
+ sa_exc.InvalidRequestError,
+ "This session is in 'inactive' state, due to the SQL "
+ "transaction "
+ "being rolled back; no further SQL can be emitted within this "
+ "transaction.",
+ sess.begin,
+ subtransactions=True,
+ )
+ sess.close()
+
+ def test_subtransaction_on_external_subtrans(self, conn):
+ users, User = self.tables.users, self.classes.User
+
+ self.mapper_registry.map_imperatively(User, users)
+
+ trans = conn.begin()
+ sess = Session(bind=conn, autocommit=False, autoflush=True)
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ sess.begin(subtransactions=True)
+ u = User(name="ed")
+ sess.add(u)
+ sess.flush()
+ sess.commit() # commit does nothing
+ trans.rollback() # rolls back
+ assert len(sess.query(User).all()) == 0
+ sess.close()
+
+ def test_subtransaction_on_noautocommit(self):
+ User, users = self.classes.User, self.tables.users
+
+ self.mapper_registry.map_imperatively(User, users)
+ sess = fixture_session(autocommit=False, autoflush=True)
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ sess.begin(subtransactions=True)
+ u = User(name="u1")
+ sess.add(u)
+ sess.flush()
+ sess.commit() # commit does nothing
+ sess.rollback() # rolls back
+ assert len(sess.query(User).all()) == 0
+ sess.close()
+
+ @testing.requires.savepoints
+ def test_heavy_nesting(self):
+ users = self.tables.users
+
+ session = fixture_session()
+ session.begin()
+ session.connection().execute(users.insert().values(name="user1"))
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ session.begin(subtransactions=True)
+ session.begin_nested()
+ session.connection().execute(users.insert().values(name="user2"))
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 2
+ )
+ session.rollback()
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 1
+ )
+ session.connection().execute(users.insert().values(name="user3"))
+ session.commit()
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 2
+ )
+
+ @testing.requires.savepoints
+ def test_heavy_nesting_future(self):
+ users = self.tables.users
+
+ from sqlalchemy.future import Engine
+
+ engine = Engine._future_facade(testing.db)
+ with Session(engine, autocommit=False) as session:
+ session.begin()
+ session.connection().execute(users.insert().values(name="user1"))
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ session.begin(subtransactions=True)
+ session.begin_nested()
+ session.connection().execute(users.insert().values(name="user2"))
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 2
+ )
+ session.rollback()
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 1
+ )
+ session.connection().execute(users.insert().values(name="user3"))
+ session.commit()
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 2
+ )
+
+ @testing.requires.savepoints
+ def test_mixed_transaction_control(self):
+ users, User = self.tables.users, self.classes.User
+
+ self.mapper_registry.map_imperatively(User, users)
+
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True)
+
+ sess.begin()
+ sess.begin_nested()
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ transaction = sess.begin(subtransactions=True)
+
+ sess.add(User(name="u1"))
+
+ transaction.commit()
+ sess.commit()
+ sess.commit()
+
+ sess.close()
+
+ eq_(len(sess.query(User).all()), 1)
+
+ t1 = sess.begin()
+ t2 = sess.begin_nested()
+
+ sess.add(User(name="u2"))
+
+ t2.commit()
+ assert sess._legacy_transaction() is t1
+
+ sess.close()
+
+ @testing.requires.savepoints
+ def test_nested_transaction_connection_add_autocommit(self):
+ users, User = self.tables.users, self.classes.User
+
+ self.mapper_registry.map_imperatively(User, users)
+
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True)
+
+ sess.begin()
+ sess.begin_nested()
+
+ u1 = User(name="u1")
+ sess.add(u1)
+ sess.flush()
+
+ sess.rollback()
+
+ u2 = User(name="u2")
+ sess.add(u2)
+
+ sess.commit()
+
+ eq_(set(sess.query(User).all()), set([u2]))
+
+ sess.begin()
+ sess.begin_nested()
+
+ u3 = User(name="u3")
+ sess.add(u3)
+ sess.commit() # commit the nested transaction
+ sess.rollback()
+
+ eq_(set(sess.query(User).all()), set([u2]))
+
+ sess.close()
+
+ def test_active_flag_autocommit(self):
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = Session(bind=testing.db, autocommit=True)
+ assert not sess.is_active
+ sess.begin()
+ assert sess.is_active
+ sess.rollback()
+ assert not sess.is_active
+
+
class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest):
run_inserts = None
)
+class PolyCacheKeyTest(CacheKeyFixture, _poly_fixtures._Polymorphic):
+ run_setup_mappers = "once"
+ run_inserts = None
+ run_deletes = None
+
+ def _stmt_20(self, *elements):
+ return tuple(
+ elem._statement_20() if isinstance(elem, sa.orm.Query) else elem
+ for elem in elements
+ )
+
+ def test_wp_queries(self):
+ Person, Manager, Engineer, Boss = self.classes(
+ "Person", "Manager", "Engineer", "Boss"
+ )
+
+ def one():
+ with assertions.expect_deprecated_20(w_polymorphic_dep):
+ return (
+ fixture_session()
+ .query(Person)
+ .with_polymorphic([Manager, Engineer])
+ )
+
+ def two():
+ wp = with_polymorphic(Person, [Manager, Engineer])
+
+ return fixture_session().query(wp)
+
+ def three():
+ wp = with_polymorphic(Person, [Manager, Engineer])
+
+ return fixture_session().query(wp).filter(wp.name == "asdfo")
+
+ def three_a():
+ wp = with_polymorphic(Person, [Manager, Engineer], flat=True)
+
+ return fixture_session().query(wp).filter(wp.name == "asdfo")
+
+ def four():
+ with assertions.expect_deprecated_20(w_polymorphic_dep):
+ return (
+ fixture_session()
+ .query(Person)
+ .with_polymorphic([Manager, Engineer])
+ .filter(Person.name == "asdf")
+ )
+
+ def five():
+ subq = (
+ select(Person)
+ .outerjoin(Manager)
+ .outerjoin(Engineer)
+ .subquery()
+ )
+ wp = with_polymorphic(Person, [Manager, Engineer], subq)
+
+ return fixture_session().query(wp).filter(wp.name == "asdfo")
+
+ def six():
+ subq = (
+ select(Person)
+ .outerjoin(Manager)
+ .outerjoin(Engineer)
+ .subquery()
+ )
+
+ with assertions.expect_deprecated_20(w_polymorphic_dep):
+ return (
+ fixture_session()
+ .query(Person)
+ .with_polymorphic([Manager, Engineer], subq)
+ .filter(Person.name == "asdfo")
+ )
+
+ self._run_cache_key_fixture(
+ lambda: self._stmt_20(
+ one(), two(), three(), three_a(), four(), five(), six()
+ ),
+ compare_values=True,
+ )
+
+
class AliasedClassRelationshipTest(
PartitionByFixture, testing.AssertsCompiledSQL
):
)
s = Session(testing.db)
+ fp = with_polymorphic(Foo, [Bar, Baz])
+
def go():
eq_(
- s.query(Foo)
- .with_polymorphic([Bar, Baz])
- .order_by(Foo.id)
- .options(subqueryload(Foo.related))
+ s.query(fp)
+ .order_by(fp.id)
+ .options(subqueryload(fp.related))
.all(),
[
Bar(id=1, related=Related(id=1)),
)
s = Session(testing.db)
+ fp = with_polymorphic(Foo, [Bar, Baz])
+
def go():
eq_(
- s.query(Foo)
- .with_polymorphic([Bar, Baz])
- .order_by(Foo.id)
- .options(joinedload(Foo.related))
+ s.query(fp)
+ .order_by(fp.id)
+ .options(joinedload(fp.related))
.all(),
[
Bar(id=1, related=Related(id=1)),