from sqlalchemy.util import topological
from sqlalchemy.sql import expression, operators, visitors
from itertools import chain
+from collections import deque
"""Utility functions that build upon SQL and Schema constructs."""
visitors.traverse(clause, {}, {'column':cols.add})
return cols
+def unwrap_order_by(clause):
+ """Break up an 'order by' expression into individual column-expressions,
+ without DESC/ASC/NULLS FIRST/NULLS LAST"""
+
+ cols = util.column_set()
+ stack = deque([clause])
+ while stack:
+ t = stack.popleft()
+ if isinstance(t, expression.ColumnElement) and \
+ (
+ not isinstance(t, expression._UnaryExpression) or \
+ not operators.is_ordering_modifier(t.modifier)
+ ):
+ cols.add(t)
+ else:
+ for c in t.get_children():
+ stack.append(c)
+ return cols
+
def clause_is_present(clause, search):
"""Given a target clause and a second to search within, return True
if the target is plainly present in the search without any
self.equivalents = util.column_dict(equivalents or {})
def _corresponding_column(self, col, require_embedded, _seen=util.EMPTY_SET):
- newcol = self.selectable.corresponding_column(col, require_embedded=require_embedded)
+ newcol = self.selectable.corresponding_column(
+ col,
+ require_embedded=require_embedded)
if newcol is None and col in self.equivalents and col not in _seen:
for equiv in self.equivalents[col]:
- newcol = self._corresponding_column(equiv, require_embedded=require_embedded, _seen=_seen.union([col]))
+ newcol = self._corresponding_column(equiv,
+ require_embedded=require_embedded,
+ _seen=_seen.union([col]))
if newcol is not None:
return newcol
return newcol
func
from test.lib.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, create_session, \
- lazyload, aliased
+ lazyload, aliased, column_property
from test.lib.testing import eq_, assert_raises, \
assert_raises_message
from test.lib.assertsql import CompiledSQL
from sqlalchemy.util import OrderedDict as odict
import datetime
+
class EagerTest(_fixtures.FixtureTest, testing.AssertsCompiledSQL):
run_inserts = 'once'
run_deletes = None
use_default_dialect=True
)
+class SubqueryAliasingTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
+ """test #2188"""
+
+ __dialect__ = 'default'
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('a', metadata,
+ Column('id', Integer, primary_key=True)
+ )
+
+ Table('b', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('a_id', Integer, ForeignKey('a.id')),
+ Column('value', Integer),
+ )
+
+ @classmethod
+ def setup_classes(cls):
+
+ class A(cls.Comparable):
+ pass
+ class B(cls.Comparable):
+ pass
+
+ def _fixture(self, props):
+ A, B = self.classes.A, self.classes.B
+ b_table, a_table = self.tables.b, self.tables.a
+ mapper(A,a_table, properties=props)
+ mapper(B,b_table,properties = {
+ 'a':relationship(A, backref="bs")
+ })
+
+ def test_column_property(self):
+ A, B = self.classes.A, self.classes.B
+ b_table, a_table = self.tables.b, self.tables.a
+ cp = select([func.sum(b_table.c.value)]).\
+ where(b_table.c.a_id==a_table.c.id)
+
+ self._fixture({
+ 'summation':column_property(cp)
+ })
+ self.assert_compile(
+ create_session().query(A).options(joinedload_all('bs')).
+ order_by(A.summation).
+ limit(50),
+ "SELECT anon_1.anon_2 AS anon_1_anon_2, anon_1.a_id "
+ "AS anon_1_a_id, b_1.id AS b_1_id, b_1.a_id AS "
+ "b_1_a_id, b_1.value AS b_1_value FROM (SELECT "
+ "(SELECT sum(b.value) AS sum_1 FROM b WHERE b.a_id = a.id) "
+ "AS anon_2, a.id AS a_id FROM a ORDER BY (SELECT "
+ "sum(b.value) AS sum_1 FROM b WHERE b.a_id = a.id) "
+ "LIMIT :param_1) AS anon_1 LEFT OUTER JOIN b AS b_1 ON "
+ "anon_1.a_id = b_1.a_id ORDER BY anon_1.anon_2"
+ )
+
+ def test_column_property_desc(self):
+ A, B = self.classes.A, self.classes.B
+ b_table, a_table = self.tables.b, self.tables.a
+ cp = select([func.sum(b_table.c.value)]).\
+ where(b_table.c.a_id==a_table.c.id)
+
+ self._fixture({
+ 'summation':column_property(cp)
+ })
+ self.assert_compile(
+ create_session().query(A).options(joinedload_all('bs')).
+ order_by(A.summation.desc()).
+ limit(50),
+ "SELECT anon_1.anon_2 AS anon_1_anon_2, anon_1.a_id "
+ "AS anon_1_a_id, b_1.id AS b_1_id, b_1.a_id AS "
+ "b_1_a_id, b_1.value AS b_1_value FROM (SELECT "
+ "(SELECT sum(b.value) AS sum_1 FROM b WHERE b.a_id = a.id) "
+ "AS anon_2, a.id AS a_id FROM a ORDER BY (SELECT "
+ "sum(b.value) AS sum_1 FROM b WHERE b.a_id = a.id) DESC "
+ "LIMIT :param_1) AS anon_1 LEFT OUTER JOIN b AS b_1 ON "
+ "anon_1.a_id = b_1.a_id ORDER BY anon_1.anon_2 DESC"
+ )
+
+ def test_column_property_correlated(self):
+ A, B = self.classes.A, self.classes.B
+ b_table, a_table = self.tables.b, self.tables.a
+ cp = select([func.sum(b_table.c.value)]).\
+ where(b_table.c.a_id==a_table.c.id).\
+ correlate(a_table)
+
+ self._fixture({
+ 'summation':column_property(cp)
+ })
+ self.assert_compile(
+ create_session().query(A).options(joinedload_all('bs')).
+ order_by(A.summation).
+ limit(50),
+ "SELECT anon_1.anon_2 AS anon_1_anon_2, anon_1.a_id "
+ "AS anon_1_a_id, b_1.id AS b_1_id, b_1.a_id AS "
+ "b_1_a_id, b_1.value AS b_1_value FROM (SELECT "
+ "(SELECT sum(b.value) AS sum_1 FROM b WHERE b.a_id = a.id) "
+ "AS anon_2, a.id AS a_id FROM a ORDER BY (SELECT "
+ "sum(b.value) AS sum_1 FROM b WHERE b.a_id = a.id) "
+ "LIMIT :param_1) AS anon_1 LEFT OUTER JOIN b AS b_1 ON "
+ "anon_1.a_id = b_1.a_id ORDER BY anon_1.anon_2"
+ )
+
+ def test_standalone_subquery_unlabeled(self):
+ A, B = self.classes.A, self.classes.B
+ b_table, a_table = self.tables.b, self.tables.a
+ self._fixture({})
+ cp = select([func.sum(b_table.c.value)]).\
+ where(b_table.c.a_id==a_table.c.id).\
+ correlate(a_table).as_scalar()
+ # note its re-rendering the subquery in the
+ # outermost order by. usually we want it to address
+ # the column within the subquery. labelling fixes that.
+ self.assert_compile(
+ create_session().query(A).options(joinedload_all('bs')).
+ order_by(cp).
+ limit(50),
+ "SELECT anon_1.a_id AS anon_1_a_id, anon_1.anon_2 "
+ "AS anon_1_anon_2, b_1.id AS b_1_id, b_1.a_id AS "
+ "b_1_a_id, b_1.value AS b_1_value FROM (SELECT a.id "
+ "AS a_id, (SELECT sum(b.value) AS sum_1 FROM b WHERE "
+ "b.a_id = a.id) AS anon_2 FROM a ORDER BY (SELECT "
+ "sum(b.value) AS sum_1 FROM b WHERE b.a_id = a.id) "
+ "LIMIT :param_1) AS anon_1 LEFT OUTER JOIN b AS b_1 "
+ "ON anon_1.a_id = b_1.a_id ORDER BY "
+ "(SELECT anon_1.anon_2 FROM b WHERE b.a_id = anon_1.a_id)"
+ )
+
+ def test_standalone_subquery_labeled(self):
+ A, B = self.classes.A, self.classes.B
+ b_table, a_table = self.tables.b, self.tables.a
+ self._fixture({})
+ cp = select([func.sum(b_table.c.value)]).\
+ where(b_table.c.a_id==a_table.c.id).\
+ correlate(a_table).as_scalar().label('foo')
+ self.assert_compile(
+ create_session().query(A).options(joinedload_all('bs')).
+ order_by(cp).
+ limit(50),
+ "SELECT anon_1.a_id AS anon_1_a_id, anon_1.foo "
+ "AS anon_1_foo, b_1.id AS b_1_id, b_1.a_id AS "
+ "b_1_a_id, b_1.value AS b_1_value FROM (SELECT a.id "
+ "AS a_id, (SELECT sum(b.value) AS sum_1 FROM b WHERE "
+ "b.a_id = a.id) AS foo FROM a ORDER BY (SELECT "
+ "sum(b.value) AS sum_1 FROM b WHERE b.a_id = a.id) "
+ "LIMIT :param_1) AS anon_1 LEFT OUTER JOIN b AS b_1 "
+ "ON anon_1.a_id = b_1.a_id ORDER BY "
+ "anon_1.foo"
+ )
+
+ def test_standalone_negated(self):
+ A, B = self.classes.A, self.classes.B
+ b_table, a_table = self.tables.b, self.tables.a
+ self._fixture({})
+ cp = select([func.sum(b_table.c.value)]).\
+ where(b_table.c.a_id==a_table.c.id).\
+ correlate(a_table).\
+ as_scalar()
+ # test a different unary operator
+ self.assert_compile(
+ create_session().query(A).options(joinedload_all('bs')).
+ order_by(~cp).
+ limit(50),
+ "SELECT anon_1.a_id AS anon_1_a_id, anon_1.anon_2 "
+ "AS anon_1_anon_2, b_1.id AS b_1_id, b_1.a_id AS "
+ "b_1_a_id, b_1.value AS b_1_value FROM (SELECT a.id "
+ "AS a_id, NOT (SELECT sum(b.value) AS sum_1 FROM b "
+ "WHERE b.a_id = a.id) FROM a ORDER BY NOT (SELECT "
+ "sum(b.value) AS sum_1 FROM b WHERE b.a_id = a.id) "
+ "LIMIT :param_1) AS anon_1 LEFT OUTER JOIN b AS b_1 "
+ "ON anon_1.a_id = b_1.a_id ORDER BY anon_1.anon_2"
+ )
+
+
+
+
class AddEntityTest(_fixtures.FixtureTest):
run_inserts = 'once'
run_deletes = None