from sqlalchemy import select
from sqlalchemy import testing
from sqlalchemy import text
+from sqlalchemy import union
from sqlalchemy import util
from sqlalchemy.orm import aliased
from sqlalchemy.orm import column_property
"FROM users JOIN anon_1 ON users.id = anon_1.id",
)
+ def test_nested_union_deferred(self, deferred_fixture):
+ """test #6678"""
+ User = deferred_fixture
+
+ s1 = select(User).where(User.id == 5)
+ s2 = select(User).where(User.id == 6)
+
+ s3 = select(User).where(User.id == 7)
+
+ stmt = union(s1.union(s2), s3)
+
+ u_alias = aliased(User, stmt.subquery())
+
+ self.assert_compile(
+ select(u_alias),
+ "SELECT anon_1.id FROM ((SELECT users.name, users.id FROM users "
+ "WHERE users.id = :id_1 UNION SELECT users.name, users.id "
+ "FROM users WHERE users.id = :id_2) "
+ "UNION SELECT users.name AS name, users.id AS id "
+ "FROM users WHERE users.id = :id_3) AS anon_1",
+ )
+
+ def test_nested_union_undefer_option(self, deferred_fixture):
+ """test #6678
+
+ in this case we want to see that the unions include the deferred
+ columns so that if we undefer on the outside we can get the
+ column.
+
+ """
+ User = deferred_fixture
+
+ s1 = select(User).where(User.id == 5)
+ s2 = select(User).where(User.id == 6)
+
+ s3 = select(User).where(User.id == 7)
+
+ stmt = union(s1.union(s2), s3)
+
+ u_alias = aliased(User, stmt.subquery())
+
+ self.assert_compile(
+ select(u_alias).options(undefer(u_alias.name)),
+ "SELECT anon_1.name, anon_1.id FROM "
+ "((SELECT users.name, users.id FROM users "
+ "WHERE users.id = :id_1 UNION SELECT users.name, users.id "
+ "FROM users WHERE users.id = :id_2) "
+ "UNION SELECT users.name AS name, users.id AS id "
+ "FROM users WHERE users.id = :id_3) AS anon_1",
+ )
+
class ExtraColsTest(QueryTest, AssertsCompiledSQL):
__dialect__ = "default"
from sqlalchemy.orm import column_property
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import defer
+from sqlalchemy.orm import deferred
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import lazyload
from sqlalchemy.orm import mapper
from sqlalchemy.orm import Session
from sqlalchemy.orm import subqueryload
from sqlalchemy.orm import synonym
+from sqlalchemy.orm import undefer
from sqlalchemy.orm.context import QueryContext
from sqlalchemy.orm.util import join
from sqlalchemy.orm.util import with_parent
self.assert_sql_count(testing.db, go, 1)
+class SetOpsWDeferredTest(QueryTest, AssertsCompiledSQL):
+ __dialect__ = "default"
+
+ run_setup_mappers = None
+
+ @testing.fixture
+ def deferred_fixture(self):
+ User = self.classes.User
+ users = self.tables.users
+
+ mapper(
+ User,
+ users,
+ properties={
+ "name": deferred(users.c.name),
+ "name_upper": column_property(
+ func.upper(users.c.name), deferred=True
+ ),
+ },
+ )
+
+ return User
+
+ def test_flat_twolevel_union_deferred(self, deferred_fixture):
+ """test #6678
+
+ note that due to #6661, the SELECTs inside the union include the
+ deferred "name" column. this so we can switch to undeferred on
+ the outside. this didn't work in 1.3.
+
+ """
+ User = deferred_fixture
+
+ s = fixture_session()
+
+ s1 = s.query(User).filter(User.id == 7)
+ s2 = s.query(User).filter(User.id == 8)
+
+ stmt = s1.union(s2).order_by(User.id)
+ self.assert_compile(
+ stmt,
+ "SELECT anon_1.users_id AS anon_1_users_id FROM "
+ "(SELECT users.name AS users_name, users.id AS users_id "
+ "FROM users WHERE users.id = :id_1 "
+ "UNION "
+ "SELECT users.name AS users_name, users.id AS users_id FROM users "
+ "WHERE users.id = :id_2) AS anon_1 ORDER BY anon_1.users_id",
+ )
+
+ recs = stmt.all()
+ eq_(recs, [User(id=7), User(id=8)])
+ for rec in recs:
+ assert "name" not in rec.__dict__
+
+ eq_(stmt.count(), 2)
+
+ def test_flat_twolevel_union_undeferred(self, deferred_fixture):
+ """test #6678
+
+ in this case we want to see that the unions include the deferred
+ columns so that if we undefer on the outside we can get the
+ column. #6661 allows this.
+
+ """
+ User = deferred_fixture
+
+ s = fixture_session()
+
+ s1 = s.query(User).filter(User.id == 7)
+ s2 = s.query(User).filter(User.id == 8)
+
+ stmt = s1.union(s2).options(undefer(User.name)).order_by(User.id)
+ self.assert_compile(
+ stmt,
+ "SELECT anon_1.users_name AS anon_1_users_name, "
+ "anon_1.users_id AS anon_1_users_id FROM "
+ "(SELECT users.name AS users_name, users.id AS users_id "
+ "FROM users WHERE users.id = :id_1 "
+ "UNION "
+ "SELECT users.name AS users_name, users.id AS users_id "
+ "FROM users WHERE users.id = :id_2) AS anon_1 "
+ "ORDER BY anon_1.users_id",
+ )
+
+ recs = stmt.all()
+ for rec in recs:
+ assert "name" in rec.__dict__
+ eq_(
+ recs,
+ [
+ User(id=7, name="jack"),
+ User(id=8, name="ed"),
+ ],
+ )
+
+ eq_(stmt.count(), 2)
+
+ def test_nested_union_deferred(self, deferred_fixture):
+ """test #6678
+
+ note that due to #6661, the SELECTs inside the union include the
+ deferred "name" column. this so we can switch to undeferred on
+ the outside. this didn't work in 1.3.
+
+ """
+ User = deferred_fixture
+
+ s = fixture_session()
+
+ s1 = s.query(User).filter(User.id == 7)
+ s2 = s.query(User).filter(User.id == 8)
+
+ s3 = s.query(User).filter(User.id == 9)
+
+ stmt = s1.union(s2).union(s3).order_by(User.id)
+ self.assert_compile(
+ stmt,
+ "SELECT anon_1.anon_2_users_id AS anon_1_anon_2_users_id "
+ "FROM ("
+ "SELECT anon_2.users_name AS anon_2_users_name, "
+ "anon_2.users_id AS anon_2_users_id FROM "
+ "(SELECT users.name AS users_name, users.id AS users_id "
+ "FROM users WHERE users.id = :id_1 UNION "
+ "SELECT users.name AS users_name, users.id AS users_id "
+ "FROM users WHERE users.id = :id_2) AS anon_2 "
+ "UNION "
+ "SELECT users.name AS users_name, users.id AS users_id FROM users "
+ "WHERE users.id = :id_3) AS anon_1 "
+ "ORDER BY anon_1.anon_2_users_id",
+ )
+
+ recs = stmt.all()
+ eq_(recs, [User(id=7), User(id=8), User(id=9)])
+ for rec in recs:
+ assert "name" not in rec.__dict__
+
+ eq_(stmt.count(), 3)
+
+ def test_nested_union_undeferred(self, deferred_fixture):
+ """test #6678
+
+ in this case we want to see that the unions include the deferred
+ columns so that if we undefer on the outside we can get the
+ column. #6661 allows this.
+
+ """
+ User = deferred_fixture
+
+ s = fixture_session()
+
+ s1 = s.query(User).filter(User.id == 7)
+ s2 = s.query(User).filter(User.id == 8)
+
+ s3 = s.query(User).filter(User.id == 9)
+
+ stmt = (
+ s1.union(s2)
+ .union(s3)
+ .options(undefer(User.name))
+ .order_by(User.id)
+ )
+ self.assert_compile(
+ stmt,
+ "SELECT anon_1.anon_2_users_name AS anon_1_anon_2_users_name, "
+ "anon_1.anon_2_users_id AS anon_1_anon_2_users_id "
+ "FROM ("
+ "SELECT anon_2.users_name AS anon_2_users_name, "
+ "anon_2.users_id AS anon_2_users_id FROM "
+ "(SELECT users.name AS users_name, users.id AS users_id "
+ "FROM users WHERE users.id = :id_1 UNION "
+ "SELECT users.name AS users_name, users.id AS users_id "
+ "FROM users WHERE users.id = :id_2) AS anon_2 "
+ "UNION "
+ "SELECT users.name AS users_name, users.id AS users_id FROM users "
+ "WHERE users.id = :id_3) AS anon_1 "
+ "ORDER BY anon_1.anon_2_users_id",
+ )
+
+ recs = stmt.all()
+ for rec in recs:
+ assert "name" in rec.__dict__
+ eq_(
+ recs,
+ [
+ User(id=7, name="jack"),
+ User(id=8, name="ed"),
+ User(id=9, name="fred"),
+ ],
+ )
+
+ eq_(stmt.count(), 3)
+
+
class AggregateTest(QueryTest):
def test_sum(self):
Order = self.classes.Order