be clearer in its intent.
- Fine tuning of Query clause adaptation when
from_self(), union(), or other "select from
myself" operation, such that plain SQL expression
elements added to filter(), order_by() etc.
which are present in the nested "from myself"
query *will* be adapted in the same way an ORM
expression element will, since these
elements are otherwise not easily accessible.
[ticket:2155]
fixed up some of the error messages tailored
in [ticket:2069]
+ - Fine tuning of Query clause adaptation when
+ from_self(), union(), or other "select from
+ myself" operation, such that plain SQL expression
+ elements added to filter(), order_by() etc.
+ which are present in the nested "from myself"
+ query *will* be adapted in the same way an ORM
+ expression element will, since these
+ elements are otherwise not easily accessible.
+ [ticket:2155]
+
- It is an error to call query.get() when the
given entity is not a single, full class
entity or mapper (i.e. a column). This is
from sqlalchemy.orm.util import (
AliasedClass, ORMAdapter, _entity_descriptor, _entity_info,
_is_aliased_class, _is_mapped_class, _orm_columns, _orm_selectable,
- join as orm_join,with_parent, _attr_as_key
+ join as orm_join,with_parent, _attr_as_key, aliased
)
__all__ = ['Query', 'QueryContext', 'aliased']
-aliased = AliasedClass
-
def _generative(*assertions):
"""Mark a method as generative."""
if alias:
return alias.adapt_clause(element)
- def __replace_element(self, adapters):
- def replace(elem):
- if '_halt_adapt' in elem._annotations:
- return elem
-
- for adapter in adapters:
- e = adapter(elem)
- if e is not None:
- return e
- return replace
-
- def __replace_orm_element(self, adapters):
- def replace(elem):
- if '_halt_adapt' in elem._annotations:
- return elem
-
- if "_orm_adapt" in elem._annotations \
- or "parententity" in elem._annotations:
- for adapter in adapters:
- e = adapter(elem)
- if e is not None:
- return e
- return replace
-
- @_generative()
- def _adapt_all_clauses(self):
- self._disable_orm_filtering = True
-
def _adapt_col_list(self, cols):
return [
self._adapt_clause(
for o in cols
]
+ @_generative()
+ def _adapt_all_clauses(self):
+ self._orm_only_adapt = False
+
def _adapt_clause(self, clause, as_filter, orm_only):
+ """Adapt incoming clauses to transformations which have been applied
+ within this query."""
+
adapters = []
+
+ # do we adapt all expression elements or only those
+ # tagged as 'ORM' constructs ?
+ orm_only = getattr(self, '_orm_only_adapt', orm_only)
+
if as_filter and self._filter_aliases:
for fa in self._filter_aliases._visitor_iterator:
- adapters.append(fa.replace)
+ adapters.append(
+ (
+ orm_only, fa.replace
+ )
+ )
if self._from_obj_alias:
- adapters.append(self._from_obj_alias.replace)
+ # for the "from obj" alias, apply extra rule to the
+ # 'ORM only' check, if this query were generated from a
+ # subquery of itself, i.e. _from_selectable(), apply adaption
+ # to all SQL constructs.
+ adapters.append(
+ (
+ getattr(self, '_orm_only_from_obj_alias', orm_only),
+ self._from_obj_alias.replace
+ )
+ )
if self._polymorphic_adapters:
- adapters.append(self.__adapt_polymorphic_element)
+ adapters.append(
+ (
+ orm_only, self.__adapt_polymorphic_element
+ )
+ )
if not adapters:
return clause
- if getattr(self, '_disable_orm_filtering', not orm_only):
- return visitors.replacement_traverse(
- clause,
- {'column_collections':False},
- self.__replace_element(adapters)
- )
- else:
- return visitors.replacement_traverse(
- clause,
- {'column_collections':False},
- self.__replace_orm_element(adapters)
- )
+ def replace(elem):
+ if '_halt_adapt' in elem._annotations:
+ return elem
+
+ for _orm_only, adapter in adapters:
+ # if 'orm only', look for ORM annotations
+ # in the element before adapting.
+ if not _orm_only or \
+ '_orm_adapt' in elem._annotations or \
+ "parententity" in elem._annotations:
+
+ e = adapter(elem)
+ if e is not None:
+ return e
+
+ return visitors.replacement_traverse(
+ clause,
+ {'column_collections':False},
+ replace
+ )
def _entity_zero(self):
return self._entities[0]
):
self.__dict__.pop(attr, None)
self._set_select_from(fromclause)
+
+ # this enables clause adaptation for non-ORM
+ # expressions.
+ self._orm_only_from_obj_alias = False
+
old_entities = self._entities
self._entities = []
for e in old_entities:
# until reset_joinpoint() is called.
if need_adapter:
self._filter_aliases = ORMAdapter(right,
- equivalents=right_mapper._equivalent_columns,
+ equivalents=right_mapper and right_mapper._equivalent_columns or {},
chain_to=self._filter_aliases)
# if the onclause is a ClauseElement, adapt it with any
)
+class ColumnAccessTest(QueryTest, AssertsCompiledSQL):
+ """test access of columns after _from_selectable has been applied"""
+
+ __dialect__ = 'default'
+
+ def test_from_self(self):
+ User = self.classes.User
+ sess = create_session()
+
+ q = sess.query(User).from_self()
+ self.assert_compile(
+ q.filter(User.name=='ed'),
+ "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
+ "anon_1_users_name FROM (SELECT users.id AS users_id, users.name "
+ "AS users_name FROM users) AS anon_1 WHERE anon_1.users_name = "
+ ":name_1"
+ )
+
+ def test_from_self_twice(self):
+ User = self.classes.User
+ sess = create_session()
+
+ q = sess.query(User).from_self(User.id, User.name).from_self()
+ self.assert_compile(
+ q.filter(User.name=='ed'),
+ "SELECT anon_1.anon_2_users_id AS anon_1_anon_2_users_id, "
+ "anon_1.anon_2_users_name AS anon_1_anon_2_users_name FROM "
+ "(SELECT anon_2.users_id AS anon_2_users_id, anon_2.users_name "
+ "AS anon_2_users_name FROM (SELECT users.id AS users_id, "
+ "users.name AS users_name FROM users) AS anon_2) AS anon_1 "
+ "WHERE anon_1.anon_2_users_name = :name_1"
+ )
+
+ def test_select_from(self):
+ User = self.classes.User
+ sess = create_session()
+
+ q = sess.query(User)
+ q = sess.query(User).select_from(q.statement)
+ self.assert_compile(
+ q.filter(User.name=='ed'),
+ "SELECT anon_1.id AS anon_1_id, anon_1.name AS anon_1_name "
+ "FROM (SELECT users.id AS id, users.name AS name FROM "
+ "users) AS anon_1 WHERE anon_1.name = :name_1"
+ )
+
+ def test_anonymous_expression(self):
+ from sqlalchemy.sql import column
+
+ sess = create_session()
+ c1, c2 = column('c1'), column('c2')
+ q1 = sess.query(c1, c2).filter(c1 == 'dog')
+ q2 = sess.query(c1, c2).filter(c1 == 'cat')
+ q3 = q1.union(q2)
+ self.assert_compile(
+ q3.order_by(c1),
+ "SELECT anon_1.c1 AS anon_1_c1, anon_1.c2 "
+ "AS anon_1_c2 FROM (SELECT c1 AS c1, c2 AS c2 WHERE "
+ "c1 = :c1_1 UNION SELECT c1 AS c1, c2 AS c2 "
+ "WHERE c1 = :c1_2) AS anon_1 ORDER BY anon_1.c1"
+ )
+
+ def test_anonymous_expression_from_self_twice(self):
+ from sqlalchemy.sql import column
+
+ sess = create_session()
+ c1, c2 = column('c1'), column('c2')
+ q1 = sess.query(c1, c2).filter(c1 == 'dog')
+ q1 = q1.from_self().from_self()
+ self.assert_compile(
+ q1.order_by(c1),
+ "SELECT anon_1.anon_2_c1 AS anon_1_anon_2_c1, anon_1.anon_2_c2 AS "
+ "anon_1_anon_2_c2 FROM (SELECT anon_2.c1 AS anon_2_c1, anon_2.c2 "
+ "AS anon_2_c2 FROM (SELECT c1 AS c1, c2 AS c2 WHERE c1 = :c1_1) AS "
+ "anon_2) AS anon_1 ORDER BY anon_1.anon_2_c1"
+ )
+
+ def test_anonymous_expression_union(self):
+ from sqlalchemy.sql import column
+
+ sess = create_session()
+ c1, c2 = column('c1'), column('c2')
+ q1 = sess.query(c1, c2).filter(c1 == 'dog')
+ q2 = sess.query(c1, c2).filter(c1 == 'cat')
+ q3 = q1.union(q2)
+ self.assert_compile(
+ q3.order_by(c1),
+ "SELECT anon_1.c1 AS anon_1_c1, anon_1.c2 "
+ "AS anon_1_c2 FROM (SELECT c1 AS c1, c2 AS c2 WHERE "
+ "c1 = :c1_1 UNION SELECT c1 AS c1, c2 AS c2 "
+ "WHERE c1 = :c1_2) AS anon_1 ORDER BY anon_1.c1"
+ )
+
+ def test_table_anonymous_expression_from_self_twice(self):
+ from sqlalchemy.sql import column, table
+
+ sess = create_session()
+ t1 = table('t1', column('c1'), column('c2'))
+ q1 = sess.query(t1.c.c1, t1.c.c2).filter(t1.c.c1 == 'dog')
+ q1 = q1.from_self().from_self()
+ self.assert_compile(
+ q1.order_by(t1.c.c1),
+ "SELECT anon_1.anon_2_t1_c1 AS anon_1_anon_2_t1_c1, anon_1.anon_2_t1_c2 "
+ "AS anon_1_anon_2_t1_c2 FROM (SELECT anon_2.t1_c1 AS anon_2_t1_c1, "
+ "anon_2.t1_c2 AS anon_2_t1_c2 FROM (SELECT t1.c1 AS t1_c1, t1.c2 "
+ "AS t1_c2 FROM t1 WHERE t1.c1 = :c1_1) AS anon_2) AS anon_1 ORDER BY "
+ "anon_1.anon_2_t1_c1"
+ )
+
+ def test_anonymous_labeled_expression(self):
+ from sqlalchemy.sql import column
+
+ sess = create_session()
+ c1, c2 = column('c1'), column('c2')
+ q1 = sess.query(c1.label('foo'), c2.label('bar')).filter(c1 == 'dog')
+ q2 = sess.query(c1.label('foo'), c2.label('bar')).filter(c1 == 'cat')
+ q3 = q1.union(q2)
+ self.assert_compile(
+ q3.order_by(c1),
+ "SELECT anon_1.foo AS anon_1_foo, anon_1.bar AS anon_1_bar FROM "
+ "(SELECT c1 AS foo, c2 AS bar WHERE c1 = :c1_1 UNION SELECT "
+ "c1 AS foo, c2 AS bar WHERE c1 = :c1_2) AS anon_1 ORDER BY anon_1.foo"
+ )
+
+ def test_anonymous_expression_plus_aliased_join(self):
+ """test that the 'dont alias non-ORM' rule remains for other
+ kinds of aliasing when _from_selectable() is used."""
+
+ User = self.classes.User
+ Address = self.classes.Address
+ addresses = self.tables.addresses
+
+ sess = create_session()
+ q1 = sess.query(User.id).filter(User.id > 5)
+ q1 = q1.from_self()
+ q1 = q1.join(User.addresses, aliased=True).\
+ order_by(User.id, Address.id, addresses.c.id)
+ self.assert_compile(
+ q1,
+ "SELECT anon_1.users_id AS anon_1_users_id "
+ "FROM (SELECT users.id AS users_id FROM users "
+ "WHERE users.id > :id_1) AS anon_1 JOIN addresses AS addresses_1 "
+ "ON anon_1.users_id = addresses_1.user_id "
+ "ORDER BY anon_1.users_id, addresses_1.id, addresses.id"
+ )
+
class AddEntityEquivalenceTest(fixtures.MappedTest, AssertsCompiledSQL):
run_setup_mappers = 'once'
use_default_dialect=True
)
+ def test_order_by_anonymous_col(self):
+ User = self.classes.User
+
+ s = Session()
+
+ c1, c2 = column('c1'), column('c2')
+ f = c1.label('foo')
+ q1 = s.query(User, f, c2.label('bar'))
+ q2 = s.query(User, c1.label('foo'), c2.label('bar'))
+ q3 = q1.union(q2)
+
+ self.assert_compile(
+ q3.order_by(c1),
+ "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
+ "anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS "
+ "anon_1_bar FROM (SELECT users.id AS users_id, users.name AS "
+ "users_name, c1 AS foo, c2 AS bar FROM users UNION SELECT users.id "
+ "AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar "
+ "FROM users) AS anon_1 ORDER BY anon_1.foo"
+ )
+
+ self.assert_compile(
+ q3.order_by(f),
+ "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
+ "anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS "
+ "anon_1_bar FROM (SELECT users.id AS users_id, users.name AS "
+ "users_name, c1 AS foo, c2 AS bar FROM users UNION SELECT users.id "
+ "AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar "
+ "FROM users) AS anon_1 ORDER BY anon_1.foo"
+ )
+
def test_union_mapped_colnames_preserved_across_subquery(self):
User = self.classes.User