"""
col = func.count(literal_column("*"))
- bq = self.bq.with_criteria(lambda q: q.from_self(col))
+ bq = self.bq.with_criteria(lambda q: q._from_self(col))
return bq.for_session(self.session).params(self._params).scalar()
def scalar(self):
parent.path + self.prop._wildcard_token,
)
self._default_path_loader_key = self.prop._default_path_loader_key
- self._loader_key = ("loader", self.path)
+ self._loader_key = ("loader", self.natural_path)
def __str__(self):
return " -> ".join(str(elem) for elem in self.path)
self.natural_path = parent.natural_path + (
parent.natural_path[-1].entity,
)
+ # it seems to make sense that since these paths get mixed up
+ # with statements that are cached or not, we should make
+ # sure the natural path is cachable across different occurrences
+ # of equivalent AliasedClass objects. however, so far this
+ # does not seem to be needed for whatever reason.
+ # elif not parent.path and self.is_aliased_class:
+ # self.natural_path = (self.entity._generate_cache_key()[0], )
else:
+ # self.natural_path = parent.natural_path + (entity, )
self.natural_path = self.path
@property
self.session = session
self._set_entities(entities)
+ def _set_propagate_attrs(self, values):
+ self._propagate_attrs = util.immutabledict(values)
+ return self
+
def _set_entities(self, entities):
self._raw_columns = [
coercions.expect(
"""
- self._limit_clause, self._offset_clause = orm_util._make_slice(
+ self._limit_clause, self._offset_clause = sql_util._make_slice(
self._limit_clause, self._offset_clause, start, stop
)
``Query``.
"""
- self._limit_clause = orm_util._offset_or_limit_clause(limit)
+ self._limit_clause = sql_util._offset_or_limit_clause(limit)
@_generative
@_assertions(_no_statement_condition)
``Query``.
"""
- self._offset_clause = orm_util._offset_or_limit_clause(offset)
+ self._offset_clause = sql_util._offset_or_limit_clause(offset)
@_generative
@_assertions(_no_statement_condition)
(("lazy", "select"),)
).init_class_attribute(mapper)
- def _get_leftmost(self, subq_path):
+ def _get_leftmost(self, subq_path, current_compile_state, is_root):
+ given_subq_path = subq_path
subq_path = subq_path.path
subq_mapper = orm_util._class_to_mapper(subq_path[0])
else:
leftmost_mapper, leftmost_prop = subq_mapper, subq_path[1]
+ if is_root:
+ # the subq_path is also coming from cached state, so when we start
+ # building up this path, it has to also be converted to be in terms
+ # of the current state. this is for the specific case of the entity
+ # is an AliasedClass against a subquery that's not otherwise going
+ # to adapt
+ new_subq_path = current_compile_state._entities[
+ 0
+ ].entity_zero._path_registry[leftmost_prop]
+ else:
+ new_subq_path = given_subq_path
+
leftmost_cols = leftmost_prop.local_columns
leftmost_attr = [
getattr(
- subq_path[0].entity, leftmost_mapper._columntoproperty[c].key
+ new_subq_path.path[0].entity,
+ leftmost_mapper._columntoproperty[c].key,
)
for c in leftmost_cols
]
- return leftmost_mapper, leftmost_attr, leftmost_prop
+ return leftmost_mapper, leftmost_attr, leftmost_prop, new_subq_path
def _generate_from_original_query(
self,
return q
- def _setup_options(self, q, subq_path, orig_query, effective_entity):
+ def _setup_options(
+ self, q, subq_path, rewritten_path, orig_query, effective_entity
+ ):
# propagate loader options etc. to the new query.
# these will fire relative to subq_path.
- q = q._with_current_path(subq_path)
+ q = q._with_current_path(rewritten_path)
q = q.options(*orig_query._with_options)
return q
else:
effective_entity = self.entity
- subq_path = context.query._execution_options.get(
- ("subquery_path", None), orm_util.PathRegistry.root
+ subq_path, rewritten_path = context.query._execution_options.get(
+ ("subquery_paths", None),
+ (orm_util.PathRegistry.root, orm_util.PathRegistry.root),
)
-
+ is_root = subq_path is orm_util.PathRegistry.root
subq_path = subq_path + path
+ rewritten_path = rewritten_path + path
# if not via query option, check for
# a cycle
elif subq_path.contains_mapper(self.mapper):
return
- (
- leftmost_mapper,
- leftmost_attr,
- leftmost_relationship,
- ) = self._get_leftmost(subq_path)
-
# use the current query being invoked, not the compile state
# one. this is so that we get the current parameters. however,
# it means we can't use the existing compile state, we have to make
orig_query
)
+ (
+ leftmost_mapper,
+ leftmost_attr,
+ leftmost_relationship,
+ rewritten_path,
+ ) = self._get_leftmost(rewritten_path, orig_compile_state, is_root)
+
# generate a new Query from the original, then
# produce a subquery from it.
left_alias = self._generate_from_original_query(
q._execution_options = q._execution_options.union(
{
("orig_query", SubqueryLoader): orig_query,
- ("subquery_path", None): subq_path,
+ ("subquery_paths", None): (subq_path, rewritten_path),
}
)
q, to_join, left_alias, parent_alias, effective_entity
)
- q = self._setup_options(q, subq_path, orig_query, effective_entity)
+ q = self._setup_options(
+ q, subq_path, rewritten_path, orig_query, effective_entity
+ )
q = self._setup_outermost_orderby(q)
return q
) = session.set = mapper.set = dependency.set = RandomSet
-def _offset_or_limit_clause(element, name=None, type_=None):
- """Convert the given value to an "offset or limit" clause.
-
- This handles incoming integers and converts to an expression; if
- an expression is already given, it is passed through.
-
- """
- return coercions.expect(
- roles.LimitOffsetRole, element, name=name, type_=type_
- )
-
-
-def _offset_or_limit_clause_asint_if_possible(clause):
- """Return the offset or limit clause as a simple integer if possible,
- else return the clause.
-
- """
- if clause is None:
- return None
- if hasattr(clause, "_limit_offset_value"):
- value = clause._limit_offset_value
- return util.asint(value)
- else:
- return clause
-
-
-def _make_slice(limit_clause, offset_clause, start, stop):
- """Compute LIMIT/OFFSET in terms of slice start/end
- """
-
- # for calculated limit/offset, try to do the addition of
- # values to offset in Python, however if a SQL clause is present
- # then the addition has to be on the SQL side.
- if start is not None and stop is not None:
- offset_clause = _offset_or_limit_clause_asint_if_possible(
- offset_clause
- )
- if offset_clause is None:
- offset_clause = 0
-
- if start != 0:
- offset_clause = offset_clause + start
-
- if offset_clause == 0:
- offset_clause = None
- else:
- offset_clause = _offset_or_limit_clause(offset_clause)
-
- limit_clause = _offset_or_limit_clause(stop - start)
-
- elif start is None and stop is not None:
- limit_clause = _offset_or_limit_clause(stop)
- elif start is not None and stop is None:
- offset_clause = _offset_or_limit_clause_asint_if_possible(
- offset_clause
- )
- if offset_clause is None:
- offset_clause = 0
-
- if start != 0:
- offset_clause = offset_clause + start
-
- if offset_clause == 0:
- offset_clause = None
- else:
- offset_clause = _offset_or_limit_clause(offset_clause)
-
- return limit_clause, offset_clause
-
-
def _getitem(iterable_query, item):
"""calculate __getitem__ in terms of an iterable query object
that also has a slice() method.
code=None,
err=None,
):
+ if resolved is not None and resolved is not element:
+ got = "%r object resolved from %r object" % (resolved, element)
+ else:
+ got = repr(element)
+
if argname:
- msg = "%s expected for argument %r; got %r." % (
+ msg = "%s expected for argument %r; got %s." % (
self.name,
argname,
- element,
+ got,
)
else:
- msg = "%s expected, got %r." % (self.name, element)
+ msg = "%s expected, got %s." % (self.name, got)
if advice:
msg += " " + advice
advice = (
"To create a "
"FROM clause from a %s object, use the .subquery() method."
- % (element.__class__,)
+ % (resolved.__class__ if resolved is not None else element,)
)
code = "89ve"
else:
self._offset_clause = self._offset_or_limit_clause(offset)
+ @_generative
+ @util.preload_module("sqlalchemy.sql.util")
+ def slice(self, start, stop):
+ """Apply LIMIT / OFFSET to this statement based on a slice.
+
+ The start and stop indices behave like the argument to Python's
+ built-in :func:`range` function. This method provides an
+ alternative to using ``LIMIT``/``OFFSET`` to get a slice of the
+ query.
+
+ For example, ::
+
+ stmt = select(User).order_by(User).id.slice(1, 3)
+
+ renders as
+
+ .. sourcecode:: sql
+
+ SELECT users.id AS users_id,
+ users.name AS users_name
+ FROM users ORDER BY users.id
+ LIMIT ? OFFSET ?
+ (2, 1)
+
+ .. versionadded:: 1.4 Added the :meth:`_sql.GenerativeSelect.slice`
+ method generalized from the ORM.
+
+ .. seealso::
+
+ :meth:`_sql.GenerativeSelect.limit`
+
+ :meth:`_sql.GenerativeSelect.offset`
+
+ """
+ sql_util = util.preloaded.sql_util
+ self._limit_clause, self._offset_clause = sql_util._make_slice(
+ self._limit_clause, self._offset_clause, start, stop
+ )
+
@_generative
def order_by(self, *clauses):
r"""Return a new selectable with the given list of ORDER BY
from collections import deque
from itertools import chain
+from . import coercions
from . import operators
+from . import roles
from . import visitors
from .annotation import _deep_annotate # noqa
from .annotation import _deep_deannotate # noqa
def __setstate__(self, state):
self.__dict__.update(state)
self.columns = util.WeakPopulateDict(self._locate_col)
+
+
+def _offset_or_limit_clause(element, name=None, type_=None):
+ """Convert the given value to an "offset or limit" clause.
+
+ This handles incoming integers and converts to an expression; if
+ an expression is already given, it is passed through.
+
+ """
+ return coercions.expect(
+ roles.LimitOffsetRole, element, name=name, type_=type_
+ )
+
+
+def _offset_or_limit_clause_asint_if_possible(clause):
+ """Return the offset or limit clause as a simple integer if possible,
+ else return the clause.
+
+ """
+ if clause is None:
+ return None
+ if hasattr(clause, "_limit_offset_value"):
+ value = clause._limit_offset_value
+ return util.asint(value)
+ else:
+ return clause
+
+
+def _make_slice(limit_clause, offset_clause, start, stop):
+ """Compute LIMIT/OFFSET in terms of slice start/end
+ """
+
+ # for calculated limit/offset, try to do the addition of
+ # values to offset in Python, however if a SQL clause is present
+ # then the addition has to be on the SQL side.
+ if start is not None and stop is not None:
+ offset_clause = _offset_or_limit_clause_asint_if_possible(
+ offset_clause
+ )
+ if offset_clause is None:
+ offset_clause = 0
+
+ if start != 0:
+ offset_clause = offset_clause + start
+
+ if offset_clause == 0:
+ offset_clause = None
+ else:
+ offset_clause = _offset_or_limit_clause(offset_clause)
+
+ limit_clause = _offset_or_limit_clause(stop - start)
+
+ elif start is None and stop is not None:
+ limit_clause = _offset_or_limit_clause(stop)
+ elif start is not None and stop is None:
+ offset_clause = _offset_or_limit_clause_asint_if_possible(
+ offset_clause
+ )
+ if offset_clause is None:
+ offset_clause = 0
+
+ if start != 0:
+ offset_clause = offset_clause + start
+
+ if offset_clause == 0:
+ offset_clause = None
+ else:
+ offset_clause = _offset_or_limit_clause(offset_clause)
+
+ return limit_clause, offset_clause
# ORM Query
#
r"The Query\.get\(\) function",
- r"The Query\.from_self\(\) function",
+ # r"The Query\.from_self\(\) function",
#
# ORM Session
#
mapper(Foo, self.table)
sess = Session()
+ subq = sess.query(Foo).subquery()
+
+ f1 = aliased(Foo, subq)
self.assert_compile(
- sess.query(Foo).from_self().distinct(Foo.a, Foo.b),
- "SELECT DISTINCT ON (anon_1.t_a, anon_1.t_b) anon_1.t_id "
- "AS anon_1_t_id, anon_1.t_a AS anon_1_t_a, anon_1.t_b "
- "AS anon_1_t_b FROM (SELECT t.id AS t_id, t.a AS t_a, "
- "t.b AS t_b FROM t) AS anon_1",
+ sess.query(f1).distinct(f1.a, f1.b),
+ "SELECT DISTINCT ON (anon_1.a, anon_1.b) anon_1.id "
+ "AS anon_1_id, anon_1.a AS anon_1_a, anon_1.b "
+ "AS anon_1_b FROM (SELECT t.id AS id, t.a AS a, "
+ "t.b AS b FROM t) AS anon_1",
)
def test_query_distinct_on_aliased(self):
bq = self.bakery(lambda s: s.query(User.id, User.name))
- bq += lambda q: q.from_self().with_entities(func.count(User.id))
+ bq += lambda q: q._from_self().with_entities(func.count(User.id))
for i in range(3):
session = Session(autocommit=True)
bq += lambda q: q.filter(User.name == "jack")
if cond4:
- bq += lambda q: q.from_self().with_entities(
+ bq += lambda q: q._from_self().with_entities(
func.count(User.id)
)
sess = Session(autocommit=True)
import sqlalchemy as sa
from sqlalchemy import and_
from sqlalchemy import cast
+from sqlalchemy import column
from sqlalchemy import desc
from sqlalchemy import event
from sqlalchemy import exc as sa_exc
+from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import literal_column
from sqlalchemy import or_
from sqlalchemy import select
from sqlalchemy import String
+from sqlalchemy import table
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import true
from sqlalchemy.orm import aliased
from sqlalchemy.orm import as_declarative
from sqlalchemy.orm import attributes
+from sqlalchemy.orm import backref
from sqlalchemy.orm import collections
from sqlalchemy.orm import column_property
from sqlalchemy.orm import configure_mappers
from sqlalchemy.orm import relation
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
+from sqlalchemy.orm import subqueryload
from sqlalchemy.orm import synonym
from sqlalchemy.orm import undefer
from sqlalchemy.orm import with_polymorphic
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
+from sqlalchemy.testing.assertsql import CompiledSQL
+from sqlalchemy.testing.fixtures import ComparableEntity
from sqlalchemy.testing.mock import call
from sqlalchemy.testing.mock import Mock
from sqlalchemy.testing.schema import Column
)
+class SelfRefFromSelfTest(fixtures.MappedTest, AssertsCompiledSQL):
+ run_setup_mappers = "once"
+ run_inserts = "once"
+ run_deletes = None
+ __dialect__ = "default"
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "nodes",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("parent_id", Integer, ForeignKey("nodes.id")),
+ Column("data", String(30)),
+ )
+
+ @classmethod
+ def setup_classes(cls):
+ class Node(cls.Comparable):
+ def append(self, node):
+ self.children.append(node)
+
+ @classmethod
+ def setup_mappers(cls):
+ Node, nodes = cls.classes.Node, cls.tables.nodes
+
+ mapper(
+ Node,
+ nodes,
+ properties={
+ "children": relationship(
+ Node,
+ lazy="select",
+ join_depth=3,
+ backref=backref("parent", remote_side=[nodes.c.id]),
+ )
+ },
+ )
+
+ @classmethod
+ def insert_data(cls, connection):
+ Node = cls.classes.Node
+
+ sess = create_session(connection)
+ n1 = Node(data="n1")
+ n1.append(Node(data="n11"))
+ n1.append(Node(data="n12"))
+ n1.append(Node(data="n13"))
+ n1.children[1].append(Node(data="n121"))
+ n1.children[1].append(Node(data="n122"))
+ n1.children[1].append(Node(data="n123"))
+ sess.add(n1)
+ sess.flush()
+ sess.close()
+
+ def test_from_self_inside_excludes_outside(self):
+ """test the propagation of aliased() from inside to outside
+ on a from_self()..
+ """
+
+ Node = self.classes.Node
+
+ sess = create_session()
+
+ n1 = aliased(Node)
+
+ # n1 is not inside the from_self(), so all cols must be maintained
+ # on the outside
+ with self._from_self_deprecated():
+ self.assert_compile(
+ sess.query(Node)
+ .filter(Node.data == "n122")
+ .from_self(n1, Node.id),
+ "SELECT nodes_1.id AS nodes_1_id, "
+ "nodes_1.parent_id AS nodes_1_parent_id, "
+ "nodes_1.data AS nodes_1_data, anon_1.nodes_id "
+ "AS anon_1_nodes_id "
+ "FROM nodes AS nodes_1, (SELECT nodes.id AS nodes_id, "
+ "nodes.parent_id AS nodes_parent_id, "
+ "nodes.data AS nodes_data FROM "
+ "nodes WHERE nodes.data = :data_1) AS anon_1",
+ use_default_dialect=True,
+ )
+
+ parent = aliased(Node)
+ grandparent = aliased(Node)
+ with self._from_self_deprecated():
+ q = (
+ sess.query(Node, parent, grandparent)
+ .join(parent, Node.parent)
+ .join(grandparent, parent.parent)
+ .filter(Node.data == "n122")
+ .filter(parent.data == "n12")
+ .filter(grandparent.data == "n1")
+ .from_self()
+ .limit(1)
+ )
+
+ # parent, grandparent *are* inside the from_self(), so they
+ # should get aliased to the outside.
+ self.assert_compile(
+ q,
+ "SELECT anon_1.nodes_id AS anon_1_nodes_id, "
+ "anon_1.nodes_parent_id AS anon_1_nodes_parent_id, "
+ "anon_1.nodes_data AS anon_1_nodes_data, "
+ "anon_1.nodes_1_id AS anon_1_nodes_1_id, "
+ "anon_1.nodes_1_parent_id AS anon_1_nodes_1_parent_id, "
+ "anon_1.nodes_1_data AS anon_1_nodes_1_data, "
+ "anon_1.nodes_2_id AS anon_1_nodes_2_id, "
+ "anon_1.nodes_2_parent_id AS anon_1_nodes_2_parent_id, "
+ "anon_1.nodes_2_data AS anon_1_nodes_2_data "
+ "FROM (SELECT nodes.id AS nodes_id, nodes.parent_id "
+ "AS nodes_parent_id, nodes.data AS nodes_data, "
+ "nodes_1.id AS nodes_1_id, "
+ "nodes_1.parent_id AS nodes_1_parent_id, "
+ "nodes_1.data AS nodes_1_data, nodes_2.id AS nodes_2_id, "
+ "nodes_2.parent_id AS nodes_2_parent_id, nodes_2.data AS "
+ "nodes_2_data FROM nodes JOIN nodes AS nodes_1 ON "
+ "nodes_1.id = nodes.parent_id JOIN nodes AS nodes_2 "
+ "ON nodes_2.id = nodes_1.parent_id "
+ "WHERE nodes.data = :data_1 AND nodes_1.data = :data_2 AND "
+ "nodes_2.data = :data_3) AS anon_1 LIMIT :param_1",
+ {"param_1": 1},
+ use_default_dialect=True,
+ )
+
+ def test_multiple_explicit_entities_two(self):
+ Node = self.classes.Node
+
+ sess = create_session()
+
+ parent = aliased(Node)
+ grandparent = aliased(Node)
+ with self._from_self_deprecated():
+ eq_(
+ sess.query(Node, parent, grandparent)
+ .join(parent, Node.parent)
+ .join(grandparent, parent.parent)
+ .filter(Node.data == "n122")
+ .filter(parent.data == "n12")
+ .filter(grandparent.data == "n1")
+ .from_self()
+ .first(),
+ (Node(data="n122"), Node(data="n12"), Node(data="n1")),
+ )
+
+ def test_multiple_explicit_entities_three(self):
+ Node = self.classes.Node
+
+ sess = create_session()
+
+ parent = aliased(Node)
+ grandparent = aliased(Node)
+ # same, change order around
+ with self._from_self_deprecated():
+ eq_(
+ sess.query(parent, grandparent, Node)
+ .join(parent, Node.parent)
+ .join(grandparent, parent.parent)
+ .filter(Node.data == "n122")
+ .filter(parent.data == "n12")
+ .filter(grandparent.data == "n1")
+ .from_self()
+ .first(),
+ (Node(data="n12"), Node(data="n1"), Node(data="n122")),
+ )
+
+ def test_multiple_explicit_entities_five(self):
+ Node = self.classes.Node
+
+ sess = create_session()
+
+ parent = aliased(Node)
+ grandparent = aliased(Node)
+ with self._from_self_deprecated():
+ eq_(
+ sess.query(Node, parent, grandparent)
+ .join(parent, Node.parent)
+ .join(grandparent, parent.parent)
+ .filter(Node.data == "n122")
+ .filter(parent.data == "n12")
+ .filter(grandparent.data == "n1")
+ .from_self()
+ .options(joinedload(Node.children))
+ .first(),
+ (Node(data="n122"), Node(data="n12"), Node(data="n1")),
+ )
+
+ def _from_self_deprecated(self):
+ return testing.expect_deprecated_20(
+ r"The Query.from_self\(\) function/method"
+ )
+
+
+class FromSelfTest(QueryTest, AssertsCompiledSQL):
+ __dialect__ = "default"
+
+ def _from_self_deprecated(self):
+ return testing.expect_deprecated_20(
+ r"The Query.from_self\(\) function/method"
+ )
+
+ def test_illegal_operations(self):
+
+ User = self.classes.User
+
+ s = Session()
+
+ with self._from_self_deprecated():
+ q = s.query(User).from_self()
+ assert_raises_message(
+ sa.exc.InvalidRequestError,
+ r"Can't call Query.update\(\) or Query.delete\(\)",
+ q.update,
+ {},
+ )
+
+ assert_raises_message(
+ sa.exc.InvalidRequestError,
+ r"Can't call Query.update\(\) or Query.delete\(\)",
+ q.delete,
+ {},
+ )
+
+ def test_columns_augmented_distinct_on(self):
+ User, Address = self.classes.User, self.classes.Address
+
+ sess = create_session()
+
+ with self._from_self_deprecated():
+ q = (
+ sess.query(
+ User.id,
+ User.name.label("foo"),
+ Address.id,
+ Address.email_address,
+ )
+ .distinct(Address.email_address)
+ .order_by(User.id, User.name, Address.email_address)
+ .from_self(User.id, User.name.label("foo"), Address.id)
+ )
+
+ # Address.email_address is added because of DISTINCT,
+ # however User.id, User.name are not b.c. they're already there,
+ # even though User.name is labeled
+ self.assert_compile(
+ q,
+ "SELECT anon_1.users_id AS anon_1_users_id, anon_1.foo AS foo, "
+ "anon_1.addresses_id AS anon_1_addresses_id "
+ "FROM ("
+ "SELECT DISTINCT ON (addresses.email_address) "
+ "users.id AS users_id, users.name AS foo, "
+ "addresses.id AS addresses_id, addresses.email_address AS "
+ "addresses_email_address FROM users, addresses ORDER BY "
+ "users.id, users.name, addresses.email_address"
+ ") AS anon_1",
+ dialect="postgresql",
+ )
+
+ def test_columns_augmented_roundtrip_one_from_self(self):
+ """Test workaround for legacy style DISTINCT on extra column.
+
+ See #5134
+
+ """
+ User, Address = self.classes.User, self.classes.Address
+
+ sess = create_session()
+ with self._from_self_deprecated():
+ q = (
+ sess.query(User, Address.email_address)
+ .join("addresses")
+ .distinct()
+ .from_self(User)
+ .order_by(desc(Address.email_address))
+ )
+
+ eq_([User(id=7), User(id=9), User(id=8)], q.all())
+
+ def test_columns_augmented_roundtrip_three_from_self(self):
+ """Test workaround for legacy style DISTINCT on extra column.
+
+ See #5134
+
+ """
+
+ User, Address = self.classes.User, self.classes.Address
+
+ sess = create_session()
+
+ with self._from_self_deprecated():
+ q = (
+ sess.query(
+ User.id,
+ User.name.label("foo"),
+ Address.id,
+ Address.email_address,
+ )
+ .join(Address, true())
+ .filter(User.name == "jack")
+ .filter(User.id + Address.user_id > 0)
+ .distinct()
+ .from_self(User.id, User.name.label("foo"), Address.id)
+ .order_by(User.id, User.name, Address.email_address)
+ )
+
+ eq_(
+ q.all(),
+ [
+ (7, "jack", 3),
+ (7, "jack", 4),
+ (7, "jack", 2),
+ (7, "jack", 5),
+ (7, "jack", 1),
+ ],
+ )
+ for row in q:
+ eq_(row._mapping.keys(), ["id", "foo", "id"])
+
+ def test_clause_onclause(self):
+ Order, User = (
+ self.classes.Order,
+ self.classes.User,
+ )
+
+ sess = create_session()
+ # explicit onclause with from_self(), means
+ # the onclause must be aliased against the query's custom
+ # FROM object
+ with self._from_self_deprecated():
+ eq_(
+ sess.query(User)
+ .order_by(User.id)
+ .offset(2)
+ .from_self()
+ .join(Order, User.id == Order.user_id)
+ .all(),
+ [User(name="fred")],
+ )
+
+ def test_from_self_resets_joinpaths(self):
+ """test a join from from_self() doesn't confuse joins inside the subquery
+ with the outside.
+ """
+
+ Item, Keyword = self.classes.Item, self.classes.Keyword
+
+ sess = create_session()
+
+ with self._from_self_deprecated():
+ self.assert_compile(
+ sess.query(Item)
+ .join(Item.keywords)
+ .from_self(Keyword)
+ .join(Item.keywords),
+ "SELECT keywords.id AS keywords_id, "
+ "keywords.name AS keywords_name "
+ "FROM (SELECT items.id AS items_id, "
+ "items.description AS items_description "
+ "FROM items JOIN item_keywords AS item_keywords_1 "
+ "ON items.id = "
+ "item_keywords_1.item_id JOIN keywords "
+ "ON keywords.id = item_keywords_1.keyword_id) "
+ "AS anon_1 JOIN item_keywords AS item_keywords_2 ON "
+ "anon_1.items_id = item_keywords_2.item_id "
+ "JOIN keywords ON "
+ "keywords.id = item_keywords_2.keyword_id",
+ use_default_dialect=True,
+ )
+
+ def test_single_prop_9(self):
+ User = self.classes.User
+
+ sess = create_session()
+ with self._from_self_deprecated():
+ self.assert_compile(
+ sess.query(User)
+ .filter(User.name == "ed")
+ .from_self()
+ .join(User.orders),
+ "SELECT anon_1.users_id AS anon_1_users_id, "
+ "anon_1.users_name AS anon_1_users_name "
+ "FROM (SELECT users.id AS users_id, users.name AS users_name "
+ "FROM users "
+ "WHERE users.name = :name_1) AS anon_1 JOIN orders "
+ "ON anon_1.users_id = orders.user_id",
+ )
+
+ def test_anonymous_expression_from_self_twice_oldstyle(self):
+ # relies upon _orm_only_from_obj_alias setting
+
+ sess = create_session()
+ c1, c2 = column("c1"), column("c2")
+ q1 = sess.query(c1, c2).filter(c1 == "dog")
+ with self._from_self_deprecated():
+ q1 = q1.from_self().from_self()
+ self.assert_compile(
+ q1.order_by(c1),
+ "SELECT anon_1.anon_2_c1 AS anon_1_anon_2_c1, anon_1.anon_2_c2 AS "
+ "anon_1_anon_2_c2 FROM (SELECT anon_2.c1 AS anon_2_c1, anon_2.c2 "
+ "AS anon_2_c2 "
+ "FROM (SELECT c1, c2 WHERE c1 = :c1_1) AS "
+ "anon_2) AS anon_1 ORDER BY anon_1.anon_2_c1",
+ )
+
+ def test_anonymous_expression_plus_flag_aliased_join(self):
+ """test that the 'dont alias non-ORM' rule remains for other
+ kinds of aliasing when _from_selectable() is used."""
+
+ User = self.classes.User
+ Address = self.classes.Address
+ addresses = self.tables.addresses
+
+ sess = create_session()
+ q1 = sess.query(User.id).filter(User.id > 5)
+ with self._from_self_deprecated():
+ q1 = q1.from_self()
+
+ q1 = q1.join(User.addresses, aliased=True).order_by(
+ User.id, Address.id, addresses.c.id
+ )
+
+ self.assert_compile(
+ q1,
+ "SELECT anon_1.users_id AS anon_1_users_id "
+ "FROM (SELECT users.id AS users_id FROM users "
+ "WHERE users.id > :id_1) AS anon_1 JOIN addresses AS addresses_1 "
+ "ON anon_1.users_id = addresses_1.user_id "
+ "ORDER BY anon_1.users_id, addresses_1.id, addresses.id",
+ )
+
+ def test_anonymous_expression_plus_explicit_aliased_join(self):
+ """test that the 'dont alias non-ORM' rule remains for other
+ kinds of aliasing when _from_selectable() is used."""
+
+ User = self.classes.User
+ Address = self.classes.Address
+ addresses = self.tables.addresses
+
+ sess = create_session()
+ q1 = sess.query(User.id).filter(User.id > 5)
+ with self._from_self_deprecated():
+ q1 = q1.from_self()
+
+ aa = aliased(Address)
+ q1 = q1.join(aa, User.addresses).order_by(
+ User.id, aa.id, addresses.c.id
+ )
+ self.assert_compile(
+ q1,
+ "SELECT anon_1.users_id AS anon_1_users_id "
+ "FROM (SELECT users.id AS users_id FROM users "
+ "WHERE users.id > :id_1) AS anon_1 JOIN addresses AS addresses_1 "
+ "ON anon_1.users_id = addresses_1.user_id "
+ "ORDER BY anon_1.users_id, addresses_1.id, addresses.id",
+ )
+
+ def test_table_anonymous_expression_from_self_twice_oldstyle(self):
+ # relies upon _orm_only_from_obj_alias setting
+ from sqlalchemy.sql import column
+
+ sess = create_session()
+ t1 = table("t1", column("c1"), column("c2"))
+ q1 = sess.query(t1.c.c1, t1.c.c2).filter(t1.c.c1 == "dog")
+ with self._from_self_deprecated():
+ q1 = q1.from_self().from_self()
+ self.assert_compile(
+ q1.order_by(t1.c.c1),
+ "SELECT anon_1.anon_2_t1_c1 "
+ "AS anon_1_anon_2_t1_c1, anon_1.anon_2_t1_c2 "
+ "AS anon_1_anon_2_t1_c2 "
+ "FROM (SELECT anon_2.t1_c1 AS anon_2_t1_c1, "
+ "anon_2.t1_c2 AS anon_2_t1_c2 FROM (SELECT t1.c1 AS t1_c1, t1.c2 "
+ "AS t1_c2 FROM t1 WHERE t1.c1 = :c1_1) AS anon_2) AS anon_1 "
+ "ORDER BY anon_1.anon_2_t1_c1",
+ )
+
+ def test_self_referential(self):
+ Order = self.classes.Order
+
+ sess = create_session()
+ oalias = aliased(Order)
+
+ with self._from_self_deprecated():
+ for q in [
+ sess.query(Order, oalias)
+ .filter(Order.user_id == oalias.user_id)
+ .filter(Order.user_id == 7)
+ .filter(Order.id > oalias.id)
+ .order_by(Order.id, oalias.id),
+ sess.query(Order, oalias)
+ .filter(Order.id > oalias.id)
+ .from_self()
+ .filter(Order.user_id == oalias.user_id)
+ .filter(Order.user_id == 7)
+ .order_by(Order.id, oalias.id),
+ # same thing, but reversed.
+ sess.query(oalias, Order)
+ .filter(Order.id < oalias.id)
+ .from_self()
+ .filter(oalias.user_id == Order.user_id)
+ .filter(oalias.user_id == 7)
+ .order_by(oalias.id, Order.id),
+ # here we go....two layers of aliasing
+ sess.query(Order, oalias)
+ .filter(Order.user_id == oalias.user_id)
+ .filter(Order.user_id == 7)
+ .filter(Order.id > oalias.id)
+ .from_self()
+ .order_by(Order.id, oalias.id)
+ .limit(10)
+ .options(joinedload(Order.items)),
+ # gratuitous four layers
+ sess.query(Order, oalias)
+ .filter(Order.user_id == oalias.user_id)
+ .filter(Order.user_id == 7)
+ .filter(Order.id > oalias.id)
+ .from_self()
+ .from_self()
+ .from_self()
+ .order_by(Order.id, oalias.id)
+ .limit(10)
+ .options(joinedload(Order.items)),
+ ]:
+
+ eq_(
+ q.all(),
+ [
+ (
+ Order(
+ address_id=1,
+ description="order 3",
+ isopen=1,
+ user_id=7,
+ id=3,
+ ),
+ Order(
+ address_id=1,
+ description="order 1",
+ isopen=0,
+ user_id=7,
+ id=1,
+ ),
+ ),
+ (
+ Order(
+ address_id=None,
+ description="order 5",
+ isopen=0,
+ user_id=7,
+ id=5,
+ ),
+ Order(
+ address_id=1,
+ description="order 1",
+ isopen=0,
+ user_id=7,
+ id=1,
+ ),
+ ),
+ (
+ Order(
+ address_id=None,
+ description="order 5",
+ isopen=0,
+ user_id=7,
+ id=5,
+ ),
+ Order(
+ address_id=1,
+ description="order 3",
+ isopen=1,
+ user_id=7,
+ id=3,
+ ),
+ ),
+ ],
+ )
+
+ def test_from_self_internal_literals_oldstyle(self):
+ # relies upon _orm_only_from_obj_alias setting
+ Order = self.classes.Order
+
+ sess = create_session()
+
+ # ensure column expressions are taken from inside the subquery, not
+ # restated at the top
+ with self._from_self_deprecated():
+ q = (
+ sess.query(
+ Order.id,
+ Order.description,
+ literal_column("'q'").label("foo"),
+ )
+ .filter(Order.description == "order 3")
+ .from_self()
+ )
+ self.assert_compile(
+ q,
+ "SELECT anon_1.orders_id AS "
+ "anon_1_orders_id, "
+ "anon_1.orders_description AS anon_1_orders_description, "
+ "anon_1.foo AS anon_1_foo FROM (SELECT "
+ "orders.id AS orders_id, "
+ "orders.description AS orders_description, "
+ "'q' AS foo FROM orders WHERE "
+ "orders.description = :description_1) AS "
+ "anon_1",
+ )
+ eq_(q.all(), [(3, "order 3", "q")])
+
+ def test_column_access_from_self(self):
+ User = self.classes.User
+ sess = create_session()
+
+ with self._from_self_deprecated():
+ q = sess.query(User).from_self()
+ self.assert_compile(
+ q.filter(User.name == "ed"),
+ "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
+ "anon_1_users_name FROM (SELECT users.id AS users_id, users.name "
+ "AS users_name FROM users) AS anon_1 WHERE anon_1.users_name = "
+ ":name_1",
+ )
+
+ def test_column_access_from_self_twice(self):
+ User = self.classes.User
+ sess = create_session()
+
+ with self._from_self_deprecated():
+ q = sess.query(User).from_self(User.id, User.name).from_self()
+ self.assert_compile(
+ q.filter(User.name == "ed"),
+ "SELECT anon_1.anon_2_users_id AS anon_1_anon_2_users_id, "
+ "anon_1.anon_2_users_name AS anon_1_anon_2_users_name FROM "
+ "(SELECT anon_2.users_id AS anon_2_users_id, anon_2.users_name "
+ "AS anon_2_users_name FROM (SELECT users.id AS users_id, "
+ "users.name AS users_name FROM users) AS anon_2) AS anon_1 "
+ "WHERE anon_1.anon_2_users_name = :name_1",
+ )
+
+ def test_column_queries_nine(self):
+ Address, User = (
+ self.classes.Address,
+ self.classes.User,
+ )
+
+ sess = create_session()
+
+ adalias = aliased(Address)
+ # select from aliasing + explicit aliasing
+ with self._from_self_deprecated():
+ eq_(
+ sess.query(User, adalias.email_address, adalias.id)
+ .outerjoin(adalias, User.addresses)
+ .from_self(User, adalias.email_address)
+ .order_by(User.id, adalias.id)
+ .all(),
+ [
+ (User(name="jack", id=7), "jack@bean.com"),
+ (User(name="ed", id=8), "ed@wood.com"),
+ (User(name="ed", id=8), "ed@bettyboop.com"),
+ (User(name="ed", id=8), "ed@lala.com"),
+ (User(name="fred", id=9), "fred@fred.com"),
+ (User(name="chuck", id=10), None),
+ ],
+ )
+
+ def test_column_queries_ten(self):
+ Address, User = (
+ self.classes.Address,
+ self.classes.User,
+ )
+
+ sess = create_session()
+
+ # anon + select from aliasing
+ aa = aliased(Address)
+ with self._from_self_deprecated():
+ eq_(
+ sess.query(User)
+ .join(aa, User.addresses)
+ .filter(aa.email_address.like("%ed%"))
+ .from_self()
+ .all(),
+ [User(name="ed", id=8), User(name="fred", id=9)],
+ )
+
+ def test_column_queries_eleven(self):
+ Address, User = (
+ self.classes.Address,
+ self.classes.User,
+ )
+
+ sess = create_session()
+
+ adalias = aliased(Address)
+ # test eager aliasing, with/without select_entity_from aliasing
+ with self._from_self_deprecated():
+ for q in [
+ sess.query(User, adalias.email_address)
+ .outerjoin(adalias, User.addresses)
+ .options(joinedload(User.addresses))
+ .order_by(User.id, adalias.id)
+ .limit(10),
+ sess.query(User, adalias.email_address, adalias.id)
+ .outerjoin(adalias, User.addresses)
+ .from_self(User, adalias.email_address)
+ .options(joinedload(User.addresses))
+ .order_by(User.id, adalias.id)
+ .limit(10),
+ ]:
+ eq_(
+ q.all(),
+ [
+ (
+ User(
+ addresses=[
+ Address(
+ user_id=7,
+ email_address="jack@bean.com",
+ id=1,
+ )
+ ],
+ name="jack",
+ id=7,
+ ),
+ "jack@bean.com",
+ ),
+ (
+ User(
+ addresses=[
+ Address(
+ user_id=8,
+ email_address="ed@wood.com",
+ id=2,
+ ),
+ Address(
+ user_id=8,
+ email_address="ed@bettyboop.com",
+ id=3,
+ ),
+ Address(
+ user_id=8,
+ email_address="ed@lala.com",
+ id=4,
+ ),
+ ],
+ name="ed",
+ id=8,
+ ),
+ "ed@wood.com",
+ ),
+ (
+ User(
+ addresses=[
+ Address(
+ user_id=8,
+ email_address="ed@wood.com",
+ id=2,
+ ),
+ Address(
+ user_id=8,
+ email_address="ed@bettyboop.com",
+ id=3,
+ ),
+ Address(
+ user_id=8,
+ email_address="ed@lala.com",
+ id=4,
+ ),
+ ],
+ name="ed",
+ id=8,
+ ),
+ "ed@bettyboop.com",
+ ),
+ (
+ User(
+ addresses=[
+ Address(
+ user_id=8,
+ email_address="ed@wood.com",
+ id=2,
+ ),
+ Address(
+ user_id=8,
+ email_address="ed@bettyboop.com",
+ id=3,
+ ),
+ Address(
+ user_id=8,
+ email_address="ed@lala.com",
+ id=4,
+ ),
+ ],
+ name="ed",
+ id=8,
+ ),
+ "ed@lala.com",
+ ),
+ (
+ User(
+ addresses=[
+ Address(
+ user_id=9,
+ email_address="fred@fred.com",
+ id=5,
+ )
+ ],
+ name="fred",
+ id=9,
+ ),
+ "fred@fred.com",
+ ),
+ (User(addresses=[], name="chuck", id=10), None),
+ ],
+ )
+
+ def test_filter(self):
+ User = self.classes.User
+
+ with self._from_self_deprecated():
+ eq_(
+ [User(id=8), User(id=9)],
+ create_session()
+ .query(User)
+ .filter(User.id.in_([8, 9]))
+ .from_self()
+ .all(),
+ )
+
+ with self._from_self_deprecated():
+ eq_(
+ [User(id=8), User(id=9)],
+ create_session()
+ .query(User)
+ .order_by(User.id)
+ .slice(1, 3)
+ .from_self()
+ .all(),
+ )
+
+ with self._from_self_deprecated():
+ eq_(
+ [User(id=8)],
+ list(
+ create_session()
+ .query(User)
+ .filter(User.id.in_([8, 9]))
+ .from_self()
+ .order_by(User.id)[0:1]
+ ),
+ )
+
+ def test_join(self):
+ User, Address = self.classes.User, self.classes.Address
+
+ with self._from_self_deprecated():
+ eq_(
+ [
+ (User(id=8), Address(id=2)),
+ (User(id=8), Address(id=3)),
+ (User(id=8), Address(id=4)),
+ (User(id=9), Address(id=5)),
+ ],
+ create_session()
+ .query(User)
+ .filter(User.id.in_([8, 9]))
+ .from_self()
+ .join("addresses")
+ .add_entity(Address)
+ .order_by(User.id, Address.id)
+ .all(),
+ )
+
+ def test_group_by(self):
+ Address = self.classes.Address
+
+ eq_(
+ create_session()
+ .query(Address.user_id, func.count(Address.id).label("count"))
+ .group_by(Address.user_id)
+ .order_by(Address.user_id)
+ .all(),
+ [(7, 1), (8, 3), (9, 1)],
+ )
+
+ with self._from_self_deprecated():
+ eq_(
+ create_session()
+ .query(Address.user_id, Address.id)
+ .from_self(Address.user_id, func.count(Address.id))
+ .group_by(Address.user_id)
+ .order_by(Address.user_id)
+ .all(),
+ [(7, 1), (8, 3), (9, 1)],
+ )
+
+ def test_having(self):
+ User = self.classes.User
+
+ s = create_session()
+
+ with self._from_self_deprecated():
+ self.assert_compile(
+ s.query(User.id)
+ .group_by(User.id)
+ .having(User.id > 5)
+ .from_self(),
+ "SELECT anon_1.users_id AS anon_1_users_id FROM "
+ "(SELECT users.id AS users_id FROM users GROUP "
+ "BY users.id HAVING users.id > :id_1) AS anon_1",
+ )
+
+ def test_no_joinedload(self):
+ """test that joinedloads are pushed outwards and not rendered in
+ subqueries."""
+
+ User = self.classes.User
+
+ s = create_session()
+
+ with self._from_self_deprecated():
+ q = s.query(User).options(joinedload(User.addresses)).from_self()
+
+ self.assert_compile(
+ q.statement,
+ "SELECT anon_1.users_id, anon_1.users_name, addresses_1.id, "
+ "addresses_1.user_id, addresses_1.email_address FROM "
+ "(SELECT users.id AS users_id, users.name AS "
+ "users_name FROM users) AS anon_1 LEFT OUTER JOIN "
+ "addresses AS addresses_1 ON anon_1.users_id = "
+ "addresses_1.user_id ORDER BY addresses_1.id",
+ )
+
+ def test_aliases(self):
+ """test that aliased objects are accessible externally to a from_self()
+ call."""
+
+ User, Address = self.classes.User, self.classes.Address
+
+ s = create_session()
+
+ ualias = aliased(User)
+
+ with self._from_self_deprecated():
+ eq_(
+ s.query(User, ualias)
+ .filter(User.id > ualias.id)
+ .from_self(User.name, ualias.name)
+ .order_by(User.name, ualias.name)
+ .all(),
+ [
+ ("chuck", "ed"),
+ ("chuck", "fred"),
+ ("chuck", "jack"),
+ ("ed", "jack"),
+ ("fred", "ed"),
+ ("fred", "jack"),
+ ],
+ )
+
+ with self._from_self_deprecated():
+ eq_(
+ s.query(User, ualias)
+ .filter(User.id > ualias.id)
+ .from_self(User.name, ualias.name)
+ .filter(ualias.name == "ed")
+ .order_by(User.name, ualias.name)
+ .all(),
+ [("chuck", "ed"), ("fred", "ed")],
+ )
+
+ with self._from_self_deprecated():
+ eq_(
+ s.query(User, ualias)
+ .filter(User.id > ualias.id)
+ .from_self(ualias.name, Address.email_address)
+ .join(ualias.addresses)
+ .order_by(ualias.name, Address.email_address)
+ .all(),
+ [
+ ("ed", "fred@fred.com"),
+ ("jack", "ed@bettyboop.com"),
+ ("jack", "ed@lala.com"),
+ ("jack", "ed@wood.com"),
+ ("jack", "fred@fred.com"),
+ ],
+ )
+
+ def test_multiple_entities(self):
+ User, Address = self.classes.User, self.classes.Address
+
+ sess = create_session()
+
+ with self._from_self_deprecated():
+ eq_(
+ sess.query(User, Address)
+ .filter(User.id == Address.user_id)
+ .filter(Address.id.in_([2, 5]))
+ .from_self()
+ .all(),
+ [(User(id=8), Address(id=2)), (User(id=9), Address(id=5))],
+ )
+
+ with self._from_self_deprecated():
+ eq_(
+ sess.query(User, Address)
+ .filter(User.id == Address.user_id)
+ .filter(Address.id.in_([2, 5]))
+ .from_self()
+ .options(joinedload("addresses"))
+ .first(),
+ (
+ User(id=8, addresses=[Address(), Address(), Address()]),
+ Address(id=2),
+ ),
+ )
+
+ def test_multiple_with_column_entities_oldstyle(self):
+ # relies upon _orm_only_from_obj_alias setting
+ User = self.classes.User
+
+ sess = create_session()
+
+ with self._from_self_deprecated():
+ eq_(
+ sess.query(User.id)
+ .from_self()
+ .add_columns(func.count().label("foo"))
+ .group_by(User.id)
+ .order_by(User.id)
+ .from_self()
+ .all(),
+ [(7, 1), (8, 1), (9, 1), (10, 1)],
+ )
+
+
+class SubqRelationsFromSelfTest(fixtures.DeclarativeMappedTest):
+ def _from_self_deprecated(self):
+ return testing.expect_deprecated_20(
+ r"The Query.from_self\(\) function/method"
+ )
+
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class A(Base, ComparableEntity):
+ __tablename__ = "a"
+
+ id = Column(Integer, primary_key=True)
+ cs = relationship("C", order_by="C.id")
+
+ class B(Base, ComparableEntity):
+ __tablename__ = "b"
+ id = Column(Integer, primary_key=True)
+ a_id = Column(ForeignKey("a.id"))
+ a = relationship("A")
+ ds = relationship("D", order_by="D.id")
+
+ class C(Base, ComparableEntity):
+ __tablename__ = "c"
+ id = Column(Integer, primary_key=True)
+ a_id = Column(ForeignKey("a.id"))
+
+ class D(Base, ComparableEntity):
+ __tablename__ = "d"
+ id = Column(Integer, primary_key=True)
+ b_id = Column(ForeignKey("b.id"))
+
+ @classmethod
+ def insert_data(cls, connection):
+ A, B, C, D = cls.classes("A", "B", "C", "D")
+
+ s = Session(connection)
+
+ as_ = [A(id=i, cs=[C(), C()],) for i in range(1, 5)]
+
+ s.add_all(
+ [
+ B(a=as_[0], ds=[D()]),
+ B(a=as_[1], ds=[D()]),
+ B(a=as_[2]),
+ B(a=as_[3]),
+ ]
+ )
+
+ s.commit()
+
+ def test_subq_w_from_self_one(self):
+ A, B, C = self.classes("A", "B", "C")
+
+ s = Session()
+
+ cache = {}
+
+ for i in range(3):
+ with self._from_self_deprecated():
+ q = (
+ s.query(B)
+ .execution_options(compiled_cache=cache)
+ .join(B.a)
+ .filter(B.id < 4)
+ .filter(A.id > 1)
+ .from_self()
+ .options(subqueryload(B.a).subqueryload(A.cs))
+ .from_self()
+ )
+
+ def go():
+ results = q.all()
+ eq_(
+ results,
+ [
+ B(
+ a=A(cs=[C(a_id=2, id=3), C(a_id=2, id=4)], id=2),
+ a_id=2,
+ id=2,
+ ),
+ B(
+ a=A(cs=[C(a_id=3, id=5), C(a_id=3, id=6)], id=3),
+ a_id=3,
+ id=3,
+ ),
+ ],
+ )
+
+ self.assert_sql_execution(
+ testing.db,
+ go,
+ CompiledSQL(
+ "SELECT anon_1.anon_2_b_id AS anon_1_anon_2_b_id, "
+ "anon_1.anon_2_b_a_id AS anon_1_anon_2_b_a_id FROM "
+ "(SELECT anon_2.b_id AS anon_2_b_id, anon_2.b_a_id "
+ "AS anon_2_b_a_id FROM (SELECT b.id AS b_id, b.a_id "
+ "AS b_a_id FROM b JOIN a ON a.id = b.a_id "
+ "WHERE b.id < :id_1 AND a.id > :id_2) AS anon_2) AS anon_1"
+ ),
+ CompiledSQL(
+ "SELECT a.id AS a_id, anon_1.anon_2_anon_3_b_a_id AS "
+ "anon_1_anon_2_anon_3_b_a_id FROM (SELECT DISTINCT "
+ "anon_2.anon_3_b_a_id AS anon_2_anon_3_b_a_id FROM "
+ "(SELECT anon_3.b_id AS anon_3_b_id, anon_3.b_a_id "
+ "AS anon_3_b_a_id FROM (SELECT b.id AS b_id, b.a_id "
+ "AS b_a_id FROM b JOIN a ON a.id = b.a_id "
+ "WHERE b.id < :id_1 AND a.id > :id_2) AS anon_3) "
+ "AS anon_2) AS anon_1 JOIN a "
+ "ON a.id = anon_1.anon_2_anon_3_b_a_id"
+ ),
+ CompiledSQL(
+ "SELECT c.id AS c_id, c.a_id AS c_a_id, a_1.id "
+ "AS a_1_id FROM (SELECT DISTINCT anon_2.anon_3_b_a_id AS "
+ "anon_2_anon_3_b_a_id FROM "
+ "(SELECT anon_3.b_id AS anon_3_b_id, anon_3.b_a_id "
+ "AS anon_3_b_a_id FROM (SELECT b.id AS b_id, b.a_id "
+ "AS b_a_id FROM b JOIN a ON a.id = b.a_id "
+ "WHERE b.id < :id_1 AND a.id > :id_2) AS anon_3) "
+ "AS anon_2) AS anon_1 JOIN a AS a_1 ON a_1.id = "
+ "anon_1.anon_2_anon_3_b_a_id JOIN c ON a_1.id = c.a_id "
+ "ORDER BY c.id"
+ ),
+ )
+
+ s.close()
+
+ def test_subq_w_from_self_two(self):
+
+ A, B, C = self.classes("A", "B", "C")
+
+ s = Session()
+ cache = {}
+
+ for i in range(3):
+
+ def go():
+ with self._from_self_deprecated():
+ q = (
+ s.query(B)
+ .execution_options(compiled_cache=cache)
+ .join(B.a)
+ .from_self()
+ )
+ q = q.options(subqueryload(B.ds))
+
+ q.all()
+
+ self.assert_sql_execution(
+ testing.db,
+ go,
+ CompiledSQL(
+ "SELECT anon_1.b_id AS anon_1_b_id, anon_1.b_a_id AS "
+ "anon_1_b_a_id FROM (SELECT b.id AS b_id, b.a_id "
+ "AS b_a_id FROM b JOIN a ON a.id = b.a_id) AS anon_1"
+ ),
+ CompiledSQL(
+ "SELECT d.id AS d_id, d.b_id AS d_b_id, "
+ "anon_1.anon_2_b_id AS anon_1_anon_2_b_id "
+ "FROM (SELECT anon_2.b_id AS anon_2_b_id FROM "
+ "(SELECT b.id AS b_id, b.a_id AS b_a_id FROM b "
+ "JOIN a ON a.id = b.a_id) AS anon_2) AS anon_1 "
+ "JOIN d ON anon_1.anon_2_b_id = d.b_id ORDER BY d.id"
+ ),
+ )
+ s.close()
+
+
class SessionTest(fixtures.RemovesEvents, _LocalFixture):
def test_subtransactions_deprecated(self):
s1 = Session(testing.db)
)
-class FromSelfTest(QueryTest, AssertsCompiledSQL):
+class EntityFromSubqueryTest(QueryTest, AssertsCompiledSQL):
+ # formerly FromSelfTest
+
__dialect__ = "default"
def test_filter(self):
User = self.classes.User
+ subq = select(User).filter(User.id.in_([8, 9])).subquery()
+ q = create_session().query(aliased(User, subq))
eq_(
- [User(id=8), User(id=9)],
- create_session()
- .query(User)
- .filter(User.id.in_([8, 9]))
- .from_self()
- .all(),
+ [User(id=8), User(id=9)], q.all(),
)
+ subq = select(User).order_by(User.id).slice(1, 3).subquery()
+ q = create_session().query(aliased(User, subq))
+ eq_([User(id=8), User(id=9)], q.all())
+
+ subq = select(User).filter(User.id.in_([8, 9])).subquery()
+ u = aliased(User, subq)
+ q = create_session().query(u).order_by(u.id)
eq_(
- [User(id=8), User(id=9)],
- create_session()
- .query(User)
- .order_by(User.id)
- .slice(1, 3)
- .from_self()
- .all(),
- )
- eq_(
- [User(id=8)],
- list(
- create_session()
- .query(User)
- .filter(User.id.in_([8, 9]))
- .from_self()
- .order_by(User.id)[0:1]
- ),
+ [User(id=8)], list(q[0:1]),
)
def test_join(self):
User, Address = self.classes.User, self.classes.Address
+ stmt = select(User).filter(User.id.in_([8, 9])).subquery()
+
+ u = aliased(User, stmt)
+
+ q = (
+ create_session()
+ .query(u)
+ .join(u.addresses)
+ .add_entity(Address)
+ .order_by(u.id, Address.id)
+ )
eq_(
[
(User(id=8), Address(id=2)),
(User(id=8), Address(id=4)),
(User(id=9), Address(id=5)),
],
- create_session()
- .query(User)
- .filter(User.id.in_([8, 9]))
- .from_self()
- .join("addresses")
- .add_entity(Address)
- .order_by(User.id, Address.id)
- .all(),
+ q.all(),
)
def test_group_by(self):
Address = self.classes.Address
- eq_(
- create_session()
- .query(Address.user_id, func.count(Address.id).label("count"))
+ subq = (
+ select(Address.user_id, func.count(Address.id).label("count"))
.group_by(Address.user_id)
.order_by(Address.user_id)
- .all(),
- [(7, 1), (8, 3), (9, 1)],
+ .subquery()
)
-
+ # there's no reason to do aliased(Address) in this case but we're just
+ # testing
+ aq = aliased(Address, subq)
+ q = create_session().query(aq.user_id, subq.c.count)
eq_(
+ q.all(), [(7, 1), (8, 3), (9, 1)],
+ )
+
+ subq = select(Address.user_id, Address.id)
+ aq = aliased(Address, subq)
+
+ q = (
create_session()
- .query(Address.user_id, Address.id)
- .from_self(Address.user_id, func.count(Address.id))
- .group_by(Address.user_id)
- .order_by(Address.user_id)
- .all(),
- [(7, 1), (8, 3), (9, 1)],
+ .query(aq.user_id, func.count(aq.id))
+ .group_by(aq.user_id)
+ .order_by(aq.user_id)
+ )
+
+ eq_(
+ q.all(), [(7, 1), (8, 3), (9, 1)],
+ )
+
+ def test_error_w_aliased_against_select(self):
+ User = self.classes.User
+
+ s = create_session()
+
+ stmt = select(User.id)
+
+ assert_raises_message(
+ sa_exc.ArgumentError,
+ "Column expression or FROM clause expected, got "
+ "<sqlalchemy.sql.selectable.Select .*> object resolved from "
+ "<AliasedClass .* User> object. To create a FROM clause from "
+ "a <class 'sqlalchemy.sql.selectable.Select'> object",
+ s.query,
+ aliased(User, stmt),
)
def test_having(self):
s = create_session()
+ stmt = (
+ select(User.id)
+ .group_by(User.id)
+ .having(User.id > 5)
+ .apply_labels()
+ .subquery()
+ )
+
+ q = s.query(aliased(User, stmt))
self.assert_compile(
- s.query(User.id).group_by(User.id).having(User.id > 5).from_self(),
+ q,
"SELECT anon_1.users_id AS anon_1_users_id FROM "
"(SELECT users.id AS users_id FROM users GROUP "
"BY users.id HAVING users.id > :id_1) AS anon_1",
)
def test_no_joinedload(self):
- """test that joinedloads are pushed outwards and not rendered in
- subqueries."""
User = self.classes.User
s = create_session()
- self.assert_compile(
- s.query(User)
+ subq = (
+ select(User)
.options(joinedload(User.addresses))
- .from_self()
- .statement,
+ .apply_labels()
+ .subquery()
+ )
+
+ uq = aliased(User, subq)
+ q = s.query(uq)
+
+ # in 2.0 style, joinedload in the subquery is just ignored
+ self.assert_compile(
+ q.statement,
+ "SELECT anon_1.users_id, anon_1.users_name FROM (SELECT "
+ "users.id AS users_id, users.name AS users_name FROM users) "
+ "AS anon_1",
+ )
+
+ # needs to be on the outside
+ self.assert_compile(
+ q.options(joinedload(uq.addresses)).statement,
"SELECT anon_1.users_id, anon_1.users_name, addresses_1.id, "
"addresses_1.user_id, addresses_1.email_address FROM "
"(SELECT users.id AS users_id, users.name AS "
s = create_session()
ualias = aliased(User)
+
+ subq = select(User, ualias).filter(User.id > ualias.id).subquery()
+
+ uq1 = aliased(User, subq)
+ uq2 = aliased(ualias, subq)
+
+ q = s.query(uq1.name, uq2.name).order_by(uq1.name, uq2.name)
+
eq_(
- s.query(User, ualias)
- .filter(User.id > ualias.id)
- .from_self(User.name, ualias.name)
- .order_by(User.name, ualias.name)
- .all(),
+ q.all(),
[
("chuck", "ed"),
("chuck", "fred"),
],
)
+ q = (
+ s.query(uq1.name, uq2.name)
+ .filter(uq2.name == "ed")
+ .order_by(uq1.name, uq2.name)
+ )
+
eq_(
- s.query(User, ualias)
- .filter(User.id > ualias.id)
- .from_self(User.name, ualias.name)
- .filter(ualias.name == "ed")
- .order_by(User.name, ualias.name)
- .all(),
- [("chuck", "ed"), ("fred", "ed")],
+ q.all(), [("chuck", "ed"), ("fred", "ed")],
+ )
+
+ q = (
+ s.query(uq2.name, Address.email_address)
+ .join(uq2.addresses)
+ .order_by(uq2.name, Address.email_address)
)
eq_(
- s.query(User, ualias)
- .filter(User.id > ualias.id)
- .from_self(ualias.name, Address.email_address)
- .join(ualias.addresses)
- .order_by(ualias.name, Address.email_address)
- .all(),
+ q.all(),
[
("ed", "fred@fred.com"),
("jack", "ed@bettyboop.com"),
sess = create_session()
- eq_(
- sess.query(User, Address)
+ subq = (
+ select(User, Address)
.filter(User.id == Address.user_id)
.filter(Address.id.in_([2, 5]))
- .from_self()
- .all(),
+ .subquery()
+ )
+
+ uq = aliased(User, subq)
+ aq = aliased(Address, subq)
+
+ eq_(
+ sess.query(uq, aq).all(),
[(User(id=8), Address(id=2)), (User(id=9), Address(id=5))],
)
eq_(
- sess.query(User, Address)
- .filter(User.id == Address.user_id)
- .filter(Address.id.in_([2, 5]))
- .from_self()
- .options(joinedload("addresses"))
- .first(),
+ sess.query(uq, aq).options(joinedload(uq.addresses)).first(),
(
User(id=8, addresses=[Address(), Address(), Address()]),
Address(id=2),
)
def test_multiple_with_column_entities_oldstyle(self):
- # relies upon _orm_only_from_obj_alias setting
+ # this is now very awkward and not very useful
User = self.classes.User
+ subq = select(User.id).subquery()
+
+ uq = aliased(User, subq)
+
+ subq2 = (
+ select(uq.id)
+ .add_columns(func.count().label("foo"))
+ .group_by(uq.id)
+ .order_by(uq.id)
+ .subquery()
+ )
+
+ uq2 = aliased(User, subq2)
sess = create_session()
eq_(
- sess.query(User.id)
- .from_self()
- .add_columns(func.count().label("foo"))
- .group_by(User.id)
- .order_by(User.id)
- .from_self()
- .all(),
+ sess.query(uq2.id, subq2.c.foo).all(),
[(7, 1), (8, 1), (9, 1), (10, 1)],
)
__dialect__ = "default"
- def test_from_self(self):
- User = self.classes.User
- sess = create_session()
-
- q = sess.query(User).from_self()
- self.assert_compile(
- q.filter(User.name == "ed"),
- "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
- "anon_1_users_name FROM (SELECT users.id AS users_id, users.name "
- "AS users_name FROM users) AS anon_1 WHERE anon_1.users_name = "
- ":name_1",
- )
-
- def test_from_self_twice(self):
- User = self.classes.User
- sess = create_session()
-
- q = sess.query(User).from_self(User.id, User.name).from_self()
- self.assert_compile(
- q.filter(User.name == "ed"),
- "SELECT anon_1.anon_2_users_id AS anon_1_anon_2_users_id, "
- "anon_1.anon_2_users_name AS anon_1_anon_2_users_name FROM "
- "(SELECT anon_2.users_id AS anon_2_users_id, anon_2.users_name "
- "AS anon_2_users_name FROM (SELECT users.id AS users_id, "
- "users.name AS users_name FROM users) AS anon_2) AS anon_1 "
- "WHERE anon_1.anon_2_users_name = :name_1",
- )
-
def test_select_entity_from(self):
User = self.classes.User
sess = create_session()
"WHERE c1 = :c1_2) AS anon_1 ORDER BY anon_1.c1",
)
- def test_table_anonymous_expression_from_self_twice_oldstyle(self):
- # relies upon _orm_only_from_obj_alias setting
- from sqlalchemy.sql import column
-
- sess = create_session()
- t1 = table("t1", column("c1"), column("c2"))
- q1 = sess.query(t1.c.c1, t1.c.c2).filter(t1.c.c1 == "dog")
- q1 = q1.from_self().from_self()
- self.assert_compile(
- q1.order_by(t1.c.c1),
- "SELECT anon_1.anon_2_t1_c1 "
- "AS anon_1_anon_2_t1_c1, anon_1.anon_2_t1_c2 "
- "AS anon_1_anon_2_t1_c2 "
- "FROM (SELECT anon_2.t1_c1 AS anon_2_t1_c1, "
- "anon_2.t1_c2 AS anon_2_t1_c2 FROM (SELECT t1.c1 AS t1_c1, t1.c2 "
- "AS t1_c2 FROM t1 WHERE t1.c1 = :c1_1) AS anon_2) AS anon_1 "
- "ORDER BY anon_1.anon_2_t1_c1",
- )
-
def test_table_anonymous_expression_from_self_twice_newstyle(self):
from sqlalchemy.sql import column
"ORDER BY anon_1.anon_2_t1_c1",
)
- def test_anonymous_expression_from_self_twice_oldstyle(self):
- # relies upon _orm_only_from_obj_alias setting
- from sqlalchemy.sql import column
-
- sess = create_session()
- c1, c2 = column("c1"), column("c2")
- q1 = sess.query(c1, c2).filter(c1 == "dog")
- q1 = q1.from_self().from_self()
- self.assert_compile(
- q1.order_by(c1),
- "SELECT anon_1.anon_2_c1 AS anon_1_anon_2_c1, anon_1.anon_2_c2 AS "
- "anon_1_anon_2_c2 FROM (SELECT anon_2.c1 AS anon_2_c1, anon_2.c2 "
- "AS anon_2_c2 "
- "FROM (SELECT c1, c2 WHERE c1 = :c1_1) AS "
- "anon_2) AS anon_1 ORDER BY anon_1.anon_2_c1",
- )
-
def test_anonymous_expression_from_self_twice_newstyle_wlabels(self):
from sqlalchemy.sql import column
"WHERE c1 = :c1_2) AS anon_1 ORDER BY anon_1.foo",
)
- def test_anonymous_expression_plus_flag_aliased_join(self):
- """test that the 'dont alias non-ORM' rule remains for other
- kinds of aliasing when _from_selectable() is used."""
+ def test_anonymous_expression_plus_flag_aliased_join_newstyle(self):
User = self.classes.User
Address = self.classes.Address
sess = create_session()
q1 = sess.query(User.id).filter(User.id > 5)
- q1 = q1.from_self()
- q1 = q1.join(User.addresses, aliased=True).order_by(
- User.id, Address.id, addresses.c.id
+ uq = aliased(User, q1.apply_labels().subquery())
+
+ aa = aliased(Address)
+ q1 = (
+ sess.query(uq.id)
+ .join(uq.addresses.of_type(aa))
+ .order_by(uq.id, aa.id, addresses.c.id)
)
self.assert_compile(
"ORDER BY anon_1.users_id, addresses_1.id, addresses.id",
)
- def test_anonymous_expression_plus_explicit_aliased_join(self):
+ def test_anonymous_expression_plus_explicit_aliased_join_newstyle(self):
"""test that the 'dont alias non-ORM' rule remains for other
kinds of aliasing when _from_selectable() is used."""
addresses = self.tables.addresses
sess = create_session()
- q1 = sess.query(User.id).filter(User.id > 5)
- q1 = q1.from_self()
+ q1 = sess.query(User.id).filter(User.id > 5).apply_labels().subquery()
+
+ uq = aliased(User, q1)
aa = aliased(Address)
- q1 = q1.join(aa, User.addresses).order_by(
- User.id, aa.id, addresses.c.id
+
+ q1 = (
+ sess.query(uq.id)
+ .join(aa, uq.addresses)
+ .order_by(uq.id, aa.id, addresses.c.id)
)
self.assert_compile(
q1,
sess = create_session()
adalias = aliased(Address)
- # select from aliasing + explicit aliasing
- eq_(
+
+ subq = (
sess.query(User, adalias.email_address, adalias.id)
.outerjoin(adalias, User.addresses)
- .from_self(User, adalias.email_address)
- .order_by(User.id, adalias.id)
- .all(),
+ .subquery()
+ )
+ ua = aliased(User, subq)
+ aa = aliased(adalias, subq)
+
+ q = sess.query(ua, aa.email_address).order_by(ua.id, aa.id)
+ # select from aliasing + explicit aliasing
+ eq_(
+ q.all(),
[
(User(name="jack", id=7), "jack@bean.com"),
(User(name="ed", id=8), "ed@wood.com"),
# anon + select from aliasing
aa = aliased(Address)
- eq_(
+
+ subq = (
sess.query(User)
.join(aa, User.addresses)
.filter(aa.email_address.like("%ed%"))
- .from_self()
- .all(),
+ .subquery()
+ )
+ ua = aliased(User, subq)
+
+ eq_(
+ sess.query(ua).all(),
[User(name="ed", id=8), User(name="fred", id=9)],
)
sess = create_session()
adalias = aliased(Address)
- # test eager aliasing, with/without select_entity_from aliasing
- for q in [
+
+ q1 = (
sess.query(User, adalias.email_address)
.outerjoin(adalias, User.addresses)
.options(joinedload(User.addresses))
.order_by(User.id, adalias.id)
- .limit(10),
+ .limit(10)
+ )
+
+ subq = (
sess.query(User, adalias.email_address, adalias.id)
.outerjoin(adalias, User.addresses)
- .from_self(User, adalias.email_address)
- .options(joinedload(User.addresses))
- .order_by(User.id, adalias.id)
- .limit(10),
- ]:
+ .subquery()
+ )
+ ua = aliased(User, subq)
+ aa = aliased(adalias, subq)
+
+ q2 = (
+ sess.query(ua, aa.email_address)
+ .options(joinedload(ua.addresses))
+ .order_by(ua.id, aa.id)
+ .limit(10)
+ )
+
+ # test eager aliasing, with/without select_entity_from aliasing
+ for q in [q1, q2]:
eq_(
q.all(),
[
self.assert_sql_count(testing.db, go, 1)
- @testing.fails_on("firebird", "unknown")
- def test_self_referential(self):
+ def test_self_referential_from_self(self):
Order = self.classes.Order
sess = create_session()
oalias = aliased(Order)
- for q in [
+ q1 = (
sess.query(Order, oalias)
.filter(Order.user_id == oalias.user_id)
.filter(Order.user_id == 7)
.filter(Order.id > oalias.id)
- .order_by(Order.id, oalias.id),
- sess.query(Order, oalias)
- .filter(Order.id > oalias.id)
- .from_self()
- .filter(Order.user_id == oalias.user_id)
- .filter(Order.user_id == 7)
- .order_by(Order.id, oalias.id),
- # same thing, but reversed.
- sess.query(oalias, Order)
- .filter(Order.id < oalias.id)
- .from_self()
- .filter(oalias.user_id == Order.user_id)
- .filter(oalias.user_id == 7)
- .order_by(oalias.id, Order.id),
- # here we go....two layers of aliasing
- sess.query(Order, oalias)
- .filter(Order.user_id == oalias.user_id)
- .filter(Order.user_id == 7)
- .filter(Order.id > oalias.id)
- .from_self()
.order_by(Order.id, oalias.id)
- .limit(10)
- .options(joinedload(Order.items)),
- # gratuitous four layers
+ )
+
+ subq = (
+ sess.query(Order, oalias).filter(Order.id > oalias.id).subquery()
+ )
+ oa, oaa = aliased(Order, subq), aliased(oalias, subq)
+ q2 = (
+ sess.query(oa, oaa)
+ .filter(oa.user_id == oaa.user_id)
+ .filter(oa.user_id == 7)
+ .order_by(oa.id, oaa.id)
+ )
+
+ # same thing, but reversed.
+ subq = (
+ sess.query(oalias, Order).filter(Order.id < oalias.id).subquery()
+ )
+ oa, oaa = aliased(Order, subq), aliased(oalias, subq)
+ q3 = (
+ sess.query(oaa, oa)
+ .filter(oaa.user_id == oa.user_id)
+ .filter(oaa.user_id == 7)
+ .order_by(oaa.id, oa.id)
+ )
+
+ subq = (
sess.query(Order, oalias)
.filter(Order.user_id == oalias.user_id)
.filter(Order.user_id == 7)
.filter(Order.id > oalias.id)
- .from_self()
- .from_self()
- .from_self()
- .order_by(Order.id, oalias.id)
+ .subquery()
+ )
+ oa, oaa = aliased(Order, subq), aliased(oalias, subq)
+
+ # here we go....two layers of aliasing (due to joinedload w/ limit)
+ q4 = (
+ sess.query(oa, oaa)
+ .order_by(oa.id, oaa.id)
+ .limit(10)
+ .options(joinedload(oa.items))
+ )
+
+ # gratuitous four layers
+ subq4 = subq
+ for i in range(4):
+ oa, oaa = aliased(Order, subq4), aliased(oaa, subq4)
+ subq4 = sess.query(oa, oaa).subquery()
+ oa, oaa = aliased(Order, subq4), aliased(oaa, subq4)
+ q5 = (
+ sess.query(oa, oaa)
+ .order_by(oa.id, oaa.id)
.limit(10)
- .options(joinedload(Order.items)),
+ .options(joinedload(oa.items))
+ )
+ for q in [
+ q1,
+ q2,
+ q3,
+ q4,
+ q5,
]:
eq_(
],
)
- def test_from_self_internal_literals_oldstyle(self):
- # relies upon _orm_only_from_obj_alias setting
- Order = self.classes.Order
-
- sess = create_session()
-
- # ensure column expressions are taken from inside the subquery, not
- # restated at the top
- q = (
- sess.query(
- Order.id, Order.description, literal_column("'q'").label("foo")
- )
- .filter(Order.description == "order 3")
- .from_self()
- )
- self.assert_compile(
- q,
- "SELECT anon_1.orders_id AS "
- "anon_1_orders_id, "
- "anon_1.orders_description AS anon_1_orders_description, "
- "anon_1.foo AS anon_1_foo FROM (SELECT "
- "orders.id AS orders_id, "
- "orders.description AS orders_description, "
- "'q' AS foo FROM orders WHERE "
- "orders.description = :description_1) AS "
- "anon_1",
- )
- eq_(q.all(), [(3, "order 3", "q")])
-
def test_from_self_internal_literals_newstyle(self):
Order = self.classes.Order
eq_(a1.username, "jack")
sess = create_session()
- a1 = sess.query(Address).from_self().first()
+ subq = sess.query(Address).subquery()
+ aa = aliased(Address, subq)
+ a1 = sess.query(aa).first()
eq_(a1.username, "jack")
def test_overlap_subquery(self):
s = Session()
- row = (
+
+ subq = (
s.query(self.classes.Foo, self.classes.Bar)
.join(self.classes.Bar, true())
- .from_self()
- .all()[0]
+ .subquery()
)
+ fa = aliased(self.classes.Foo, subq, name="Foo")
+ ba = aliased(self.classes.Bar, subq, name="Bar")
+
+ row = s.query(fa, ba).all()[0]
+
def go():
eq_(row.Foo.id, 1)
eq_(row.Foo.bar_id, 2)
User = self.classes.User
sess = create_session()
- self.assert_compile(
+
+ subq = (
sess.query(User)
.filter(User.name == "ed")
- .from_self()
- .join(User.orders),
+ .apply_labels()
+ .subquery()
+ )
+
+ ua = aliased(User, subq)
+
+ self.assert_compile(
+ sess.query(ua).join(ua.orders),
"SELECT anon_1.users_id AS anon_1_users_id, "
"anon_1.users_name AS anon_1_users_name "
"FROM (SELECT users.id AS users_id, users.name AS users_name "
# explicit onclause with from_self(), means
# the onclause must be aliased against the query's custom
# FROM object
+ subq = sess.query(User).order_by(User.id).offset(2).subquery()
+ ua = aliased(User, subq)
eq_(
- sess.query(User)
- .order_by(User.id)
- .offset(2)
- .from_self()
- .join(Order, User.id == Order.user_id)
- .all(),
+ sess.query(ua).join(Order, ua.id == Order.user_id).all(),
[User(name="fred")],
)
use_default_dialect=True,
)
- def test_from_self_resets_joinpaths(self):
- """test a join from from_self() doesn't confuse joins inside the subquery
- with the outside.
- """
-
- Item, Keyword = self.classes.Item, self.classes.Keyword
-
- sess = create_session()
-
- self.assert_compile(
- sess.query(Item)
- .join(Item.keywords)
- .from_self(Keyword)
- .join(Item.keywords),
- "SELECT keywords.id AS keywords_id, "
- "keywords.name AS keywords_name "
- "FROM (SELECT items.id AS items_id, "
- "items.description AS items_description "
- "FROM items JOIN item_keywords AS item_keywords_1 ON items.id = "
- "item_keywords_1.item_id JOIN keywords "
- "ON keywords.id = item_keywords_1.keyword_id) "
- "AS anon_1 JOIN item_keywords AS item_keywords_2 ON "
- "anon_1.items_id = item_keywords_2.item_id "
- "JOIN keywords ON "
- "keywords.id = item_keywords_2.keyword_id",
- use_default_dialect=True,
- )
-
class JoinFromSelectableTest(fixtures.MappedTest, AssertsCompiledSQL):
__dialect__ = "default"
# n1 is not inside the from_self(), so all cols must be maintained
# on the outside
- self.assert_compile(
+
+ subq = (
sess.query(Node)
.filter(Node.data == "n122")
- .from_self(n1, Node.id),
+ .apply_labels()
+ .subquery()
+ )
+
+ na = aliased(Node, subq)
+
+ self.assert_compile(
+ sess.query(n1, na.id),
"SELECT nodes_1.id AS nodes_1_id, "
"nodes_1.parent_id AS nodes_1_parent_id, "
"nodes_1.data AS nodes_1_data, anon_1.nodes_id AS anon_1_nodes_id "
parent = aliased(Node)
grandparent = aliased(Node)
- q = (
+ subq = (
sess.query(Node, parent, grandparent)
.join(parent, Node.parent)
.join(grandparent, parent.parent)
.filter(Node.data == "n122")
.filter(parent.data == "n12")
.filter(grandparent.data == "n1")
- .from_self()
- .limit(1)
+ .apply_labels()
+ .subquery()
)
+ na = aliased(Node, subq)
+ pa = aliased(parent, subq)
+ ga = aliased(grandparent, subq)
+
+ q = sess.query(na, pa, ga).limit(1)
# parent, grandparent *are* inside the from_self(), so they
# should get aliased to the outside.
parent = aliased(Node)
grandparent = aliased(Node)
- eq_(
+
+ subq = (
sess.query(Node, parent, grandparent)
.join(parent, Node.parent)
.join(grandparent, parent.parent)
.filter(Node.data == "n122")
.filter(parent.data == "n12")
.filter(grandparent.data == "n1")
- .from_self()
- .first(),
+ .subquery()
+ )
+
+ na = aliased(Node, subq)
+ pa = aliased(parent, subq)
+ ga = aliased(grandparent, subq)
+
+ eq_(
+ sess.query(na, pa, ga).first(),
(Node(data="n122"), Node(data="n12"), Node(data="n1")),
)
parent = aliased(Node)
grandparent = aliased(Node)
# same, change order around
- eq_(
+ subq = (
sess.query(parent, grandparent, Node)
.join(parent, Node.parent)
.join(grandparent, parent.parent)
.filter(Node.data == "n122")
.filter(parent.data == "n12")
.filter(grandparent.data == "n1")
- .from_self()
- .first(),
+ .subquery()
+ )
+
+ na = aliased(Node, subq)
+ pa = aliased(parent, subq)
+ ga = aliased(grandparent, subq)
+
+ eq_(
+ sess.query(pa, ga, na).first(),
(Node(data="n12"), Node(data="n1"), Node(data="n122")),
)
parent = aliased(Node)
grandparent = aliased(Node)
- eq_(
+
+ subq = (
sess.query(Node, parent, grandparent)
.join(parent, Node.parent)
.join(grandparent, parent.parent)
.filter(Node.data == "n122")
.filter(parent.data == "n12")
.filter(grandparent.data == "n1")
- .from_self()
- .options(joinedload(Node.children))
- .first(),
+ .subquery()
+ )
+
+ na = aliased(Node, subq)
+ pa = aliased(parent, subq)
+ ga = aliased(grandparent, subq)
+
+ eq_(
+ sess.query(na, pa, ga).options(joinedload(na.children)).first(),
(Node(data="n122"), Node(data="n12"), Node(data="n1")),
)
q.enable_assertions(False).select_from(users)
- # this is fine, however
- q.from_self()
+ with testing.expect_deprecated("The Query.from_self"):
+ # this is fine, however
+ q.from_self()
def test_invalid_select_from(self):
User = self.classes.User
"FROM users ORDER BY label DESC",
)
- def test_columns_augmented_roundtrip_one_from_self(self):
+ def test_columns_augmented_roundtrip_one_from_subq(self):
"""Test workaround for legacy style DISTINCT on extra column.
See #5134
"""
User, Address = self.classes.User, self.classes.Address
-
sess = create_session()
- q = (
+
+ subq = (
sess.query(User, Address.email_address)
.join("addresses")
.distinct()
- .from_self(User)
- .order_by(desc(Address.email_address))
+ .subquery()
)
+ ua = aliased(User, subq)
+ aa = aliased(Address, subq)
+ q = sess.query(ua).order_by(desc(aa.email_address))
eq_([User(id=7), User(id=9), User(id=8)], q.all())
sess = create_session()
- q = (
+ subq = (
sess.query(
User.id,
User.name.label("foo"),
.filter(User.name == "jack")
.filter(User.id + Address.user_id > 0)
.distinct()
- .from_self(User.id, User.name.label("foo"), Address.id)
- .order_by(User.id, User.name, Address.email_address)
+ .subquery()
+ )
+
+ ua, aa = aliased(User, subq), aliased(Address, subq)
+
+ q = sess.query(ua.id, ua.name.label("foo"), aa.id).order_by(
+ ua.id, ua.name, aa.email_address
)
eq_(
sess = create_session()
- q = (
+ subq = (
sess.query(
User.id,
User.name.label("foo"),
)
.distinct()
.order_by(User.id, User.name, Address.email_address)
- .from_self(User.id, User.name.label("foo"), Address.id)
+ .apply_labels()
+ .subquery()
)
+ ua, aa = aliased(User, subq), aliased(Address, subq)
+
+ q = sess.query(ua.id, ua.name.label("foo"), aa.id)
+
# Address.email_address is added because of DISTINCT,
# however User.id, User.name are not b.c. they're already there,
# even though User.name is labeled
sess = create_session()
- q = (
+ subq = (
sess.query(
User.id,
User.name.label("foo"),
)
.distinct(Address.email_address)
.order_by(User.id, User.name, Address.email_address)
- .from_self(User.id, User.name.label("foo"), Address.id)
+ .apply_labels()
+ .subquery()
)
+ ua = aliased(User, subq)
+ aa = aliased(Address, subq)
+ q = sess.query(ua.id, ua.name.label("foo"), aa.id)
+
# Address.email_address is added because of DISTINCT,
# however User.id, User.name are not b.c. they're already there,
# even though User.name is labeled
self.assert_sql_count(testing.db, go, 2)
- def test_from_aliased(self):
+ def user_dingaling_fixture(self):
users, Dingaling, User, dingalings, Address, addresses = (
self.tables.users,
self.classes.Dingaling,
"addresses": relationship(Address, order_by=Address.id)
},
)
- sess = create_session()
+ return User, Dingaling, Address
- u = aliased(User)
+ def test_from_aliased_w_cache_one(self):
+ User, Dingaling, Address = self.user_dingaling_fixture()
- q = sess.query(u).options(selectinload(u.addresses))
+ for i in range(3):
- def go():
- eq_(
- [
- User(
- id=7,
- addresses=[
- Address(id=1, email_address="jack@bean.com")
- ],
- )
- ],
- q.filter(u.id == 7).all(),
- )
+ def go():
- self.assert_sql_count(testing.db, go, 2)
+ sess = create_session()
- def go():
- eq_(self.static.user_address_result, q.order_by(u.id).all())
+ u = aliased(User)
- self.assert_sql_count(testing.db, go, 2)
+ q = sess.query(u).options(selectinload(u.addresses))
- q = sess.query(u).options(
- selectinload(u.addresses).selectinload(Address.dingalings)
- )
+ eq_(
+ [
+ User(
+ id=7,
+ addresses=[
+ Address(id=1, email_address="jack@bean.com")
+ ],
+ )
+ ],
+ q.filter(u.id == 7).all(),
+ )
- def go():
- eq_(
- [
- User(
- id=8,
- addresses=[
- Address(
- id=2,
- email_address="ed@wood.com",
- dingalings=[Dingaling()],
- ),
- Address(id=3, email_address="ed@bettyboop.com"),
- Address(id=4, email_address="ed@lala.com"),
- ],
- ),
- User(
- id=9,
- addresses=[Address(id=5, dingalings=[Dingaling()])],
- ),
- ],
- q.filter(u.id.in_([8, 9])).all(),
- )
+ self.assert_sql_count(testing.db, go, 2)
- self.assert_sql_count(testing.db, go, 3)
+ def test_from_aliased_w_cache_two(self):
+ User, Dingaling, Address = self.user_dingaling_fixture()
+
+ for i in range(3):
+
+ def go():
+ sess = create_session()
+
+ u = aliased(User)
+
+ q = sess.query(u).options(selectinload(u.addresses))
+
+ eq_(self.static.user_address_result, q.order_by(u.id).all())
+
+ self.assert_sql_count(testing.db, go, 2)
+
+ def test_from_aliased_w_cache_three(self):
+
+ User, Dingaling, Address = self.user_dingaling_fixture()
+
+ for i in range(3):
+
+ def go():
+ sess = create_session()
+
+ u = aliased(User)
+
+ q = sess.query(u).options(
+ selectinload(u.addresses).selectinload(Address.dingalings)
+ )
+ eq_(
+ [
+ User(
+ id=8,
+ addresses=[
+ Address(
+ id=2,
+ email_address="ed@wood.com",
+ dingalings=[Dingaling()],
+ ),
+ Address(
+ id=3, email_address="ed@bettyboop.com"
+ ),
+ Address(id=4, email_address="ed@lala.com"),
+ ],
+ ),
+ User(
+ id=9,
+ addresses=[
+ Address(id=5, dingalings=[Dingaling()])
+ ],
+ ),
+ ],
+ q.filter(u.id.in_([8, 9])).all(),
+ )
+
+ self.assert_sql_count(testing.db, go, 3)
def test_from_get(self):
users, Address, addresses, User = (
eq_(len(u1.addresses), 1)
eq_(len(u2.addresses), 3)
- def test_from_aliased(self):
+ def user_dingaling_fixture(self):
users, Dingaling, User, dingalings, Address, addresses = (
self.tables.users,
self.classes.Dingaling,
"addresses": relationship(Address, order_by=Address.id)
},
)
- sess = create_session()
+ return User, Dingaling, Address
- u = aliased(User)
+ def test_from_aliased_w_cache_one(self):
+ User, Dingaling, Address = self.user_dingaling_fixture()
- q = sess.query(u).options(subqueryload(u.addresses))
+ for i in range(3):
+ sess = create_session()
- def go():
- eq_(
- [
- User(
- id=7,
- addresses=[
- Address(id=1, email_address="jack@bean.com")
- ],
- )
- ],
- q.filter(u.id == 7).all(),
- )
+ u = aliased(User)
- self.assert_sql_count(testing.db, go, 2)
+ q = sess.query(u).options(subqueryload(u.addresses))
- def go():
- eq_(self.static.user_address_result, q.order_by(u.id).all())
+ def go():
+ eq_(
+ [
+ User(
+ id=7,
+ addresses=[
+ Address(id=1, email_address="jack@bean.com")
+ ],
+ )
+ ],
+ q.filter(u.id == 7).all(),
+ )
- self.assert_sql_count(testing.db, go, 2)
+ self.assert_sql_count(testing.db, go, 2)
- q = sess.query(u).options(
- subqueryload(u.addresses).subqueryload(Address.dingalings)
- )
+ def test_from_aliased_w_cache_two(self):
+ User, Dingaling, Address = self.user_dingaling_fixture()
- def go():
- eq_(
- [
- User(
- id=8,
- addresses=[
- Address(
- id=2,
- email_address="ed@wood.com",
- dingalings=[Dingaling()],
- ),
- Address(id=3, email_address="ed@bettyboop.com"),
- Address(id=4, email_address="ed@lala.com"),
- ],
- ),
- User(
- id=9,
- addresses=[Address(id=5, dingalings=[Dingaling()])],
- ),
- ],
- q.filter(u.id.in_([8, 9])).all(),
+ for i in range(3):
+ sess = create_session()
+
+ u = aliased(User)
+
+ q = sess.query(u).options(subqueryload(u.addresses))
+
+ def go():
+ eq_(self.static.user_address_result, q.order_by(u.id).all())
+
+ self.assert_sql_count(testing.db, go, 2)
+
+ def test_from_aliased_w_cache_three(self):
+ User, Dingaling, Address = self.user_dingaling_fixture()
+
+ for i in range(3):
+ sess = create_session()
+
+ u = aliased(User)
+ q = sess.query(u).options(
+ subqueryload(u.addresses).subqueryload(Address.dingalings)
)
- self.assert_sql_count(testing.db, go, 3)
+ def go():
+ eq_(
+ [
+ User(
+ id=8,
+ addresses=[
+ Address(
+ id=2,
+ email_address="ed@wood.com",
+ dingalings=[Dingaling()],
+ ),
+ Address(
+ id=3, email_address="ed@bettyboop.com"
+ ),
+ Address(id=4, email_address="ed@lala.com"),
+ ],
+ ),
+ User(
+ id=9,
+ addresses=[
+ Address(id=5, dingalings=[Dingaling()])
+ ],
+ ),
+ ],
+ q.filter(u.id.in_([8, 9])).all(),
+ )
+
+ self.assert_sql_count(testing.db, go, 3)
def test_from_get(self):
users, Address, addresses, User = (
is_true("c2_m2o" in a1.b.__dict__)
-class FromSelfTest(fixtures.DeclarativeMappedTest):
+class FromSubqTest(fixtures.DeclarativeMappedTest):
"""because subqueryloader relies upon the .subquery() method, this means
if the original Query has a from_self() present, it needs to create
.subquery() in terms of the Query class as a from_self() selectable
cache = {}
for i in range(3):
- q = (
+
+ subq = (
s.query(B)
- .execution_options(compiled_cache=cache)
.join(B.a)
.filter(B.id < 4)
.filter(A.id > 1)
- .from_self()
- .options(subqueryload(B.a).subqueryload(A.cs))
- .from_self()
+ .subquery()
+ )
+
+ bb = aliased(B, subq)
+
+ subq2 = s.query(bb).subquery()
+
+ bb2 = aliased(bb, subq2)
+
+ q = (
+ s.query(bb2)
+ .execution_options(compiled_cache=cache)
+ .options(subqueryload(bb2.a).subqueryload(A.cs))
)
def go():
testing.db,
go,
CompiledSQL(
- "SELECT anon_1.anon_2_b_id AS anon_1_anon_2_b_id, "
- "anon_1.anon_2_b_a_id AS anon_1_anon_2_b_a_id FROM "
- "(SELECT anon_2.b_id AS anon_2_b_id, anon_2.b_a_id "
- "AS anon_2_b_a_id FROM (SELECT b.id AS b_id, b.a_id "
- "AS b_a_id FROM b JOIN a ON a.id = b.a_id "
+ "SELECT anon_1.id AS anon_1_id, "
+ "anon_1.a_id AS anon_1_a_id FROM "
+ "(SELECT anon_2.id AS id, anon_2.a_id "
+ "AS a_id FROM (SELECT b.id AS id, b.a_id "
+ "AS a_id FROM b JOIN a ON a.id = b.a_id "
"WHERE b.id < :id_1 AND a.id > :id_2) AS anon_2) AS anon_1"
),
CompiledSQL(
- "SELECT a.id AS a_id, anon_1.anon_2_anon_3_b_a_id AS "
- "anon_1_anon_2_anon_3_b_a_id FROM (SELECT DISTINCT "
- "anon_2.anon_3_b_a_id AS anon_2_anon_3_b_a_id FROM "
- "(SELECT anon_3.b_id AS anon_3_b_id, anon_3.b_a_id "
- "AS anon_3_b_a_id FROM (SELECT b.id AS b_id, b.a_id "
- "AS b_a_id FROM b JOIN a ON a.id = b.a_id "
+ "SELECT a.id AS a_id, anon_1.anon_2_a_id AS "
+ "anon_1_anon_2_a_id FROM (SELECT DISTINCT "
+ "anon_2.a_id AS anon_2_a_id FROM "
+ "(SELECT anon_3.id AS id, anon_3.a_id "
+ "AS a_id FROM (SELECT b.id AS id, b.a_id "
+ "AS a_id FROM b JOIN a ON a.id = b.a_id "
"WHERE b.id < :id_1 AND a.id > :id_2) AS anon_3) "
"AS anon_2) AS anon_1 JOIN a "
- "ON a.id = anon_1.anon_2_anon_3_b_a_id"
+ "ON a.id = anon_1.anon_2_a_id"
),
CompiledSQL(
"SELECT c.id AS c_id, c.a_id AS c_a_id, a_1.id "
- "AS a_1_id FROM (SELECT DISTINCT anon_2.anon_3_b_a_id AS "
- "anon_2_anon_3_b_a_id FROM "
- "(SELECT anon_3.b_id AS anon_3_b_id, anon_3.b_a_id "
- "AS anon_3_b_a_id FROM (SELECT b.id AS b_id, b.a_id "
- "AS b_a_id FROM b JOIN a ON a.id = b.a_id "
+ "AS a_1_id FROM (SELECT DISTINCT anon_2.a_id AS "
+ "anon_2_a_id FROM "
+ "(SELECT anon_3.id AS id, anon_3.a_id "
+ "AS a_id FROM (SELECT b.id AS id, b.a_id "
+ "AS a_id FROM b JOIN a ON a.id = b.a_id "
"WHERE b.id < :id_1 AND a.id > :id_2) AS anon_3) "
"AS anon_2) AS anon_1 JOIN a AS a_1 ON a_1.id = "
- "anon_1.anon_2_anon_3_b_a_id JOIN c ON a_1.id = c.a_id "
+ "anon_1.anon_2_a_id JOIN c ON a_1.id = c.a_id "
"ORDER BY c.id"
),
)
for i in range(3):
def go():
+
+ subq = s.query(B).join(B.a).subquery()
+
+ bq = aliased(B, subq)
+
q = (
- s.query(B)
+ s.query(bq)
.execution_options(compiled_cache=cache)
- .join(B.a)
- .from_self()
+ .options(subqueryload(bq.ds))
)
- q = q.options(subqueryload(B.ds))
q.all()
testing.db,
go,
CompiledSQL(
- "SELECT anon_1.b_id AS anon_1_b_id, anon_1.b_a_id AS "
- "anon_1_b_a_id FROM (SELECT b.id AS b_id, b.a_id "
- "AS b_a_id FROM b JOIN a ON a.id = b.a_id) AS anon_1"
+ "SELECT anon_1.id AS anon_1_id, anon_1.a_id AS "
+ "anon_1_a_id FROM (SELECT b.id AS id, b.a_id "
+ "AS a_id FROM b JOIN a ON a.id = b.a_id) AS anon_1"
),
CompiledSQL(
"SELECT d.id AS d_id, d.b_id AS d_b_id, "
- "anon_1.anon_2_b_id AS anon_1_anon_2_b_id "
- "FROM (SELECT anon_2.b_id AS anon_2_b_id FROM "
- "(SELECT b.id AS b_id, b.a_id AS b_a_id FROM b "
+ "anon_1.anon_2_id AS anon_1_anon_2_id "
+ "FROM (SELECT anon_2.id AS anon_2_id FROM "
+ "(SELECT b.id AS id, b.a_id AS a_id FROM b "
"JOIN a ON a.id = b.a_id) AS anon_2) AS anon_1 "
- "JOIN d ON anon_1.anon_2_b_id = d.b_id ORDER BY d.id"
+ "JOIN d ON anon_1.anon_2_id = d.b_id ORDER BY d.id"
),
)
s.close()
s.query(User).select_from(Address),
r"join\(\), outerjoin\(\), select_from\(\), or from_self\(\)",
),
- (
- s.query(User).from_self(),
- r"join\(\), outerjoin\(\), select_from\(\), or from_self\(\)",
- ),
):
assert_raises_message(
exc.InvalidRequestError,