self._order_by = self._group_by = self._distinct = False
self._limit = self._offset = None
self._set_select_from(fromclause)
+ old_entities = self._entities
+ self._entities = []
+ for e in old_entities:
+ e.adapt_to_selectable(self, self._from_obj[0])
def values(self, *columns):
"""Return an iterator yielding result tuples corresponding to the given list of columns"""
SELECT * FROM (SELECT * FROM X UNION SELECT * FROM y UNION SELECT * FROM Z)
"""
+
+
return self._from_selectable(
expression.union(*([self]+ list(q))))
else:
return entity.base_mapper is self.path_entity
+ def adapt_to_selectable(self, query, sel):
+ query._entities.append(self)
+
def _get_entity_clauses(self, query, context):
adapter = None
self.entity_zero = list(self.entities)[0]
else:
self.entity_zero = None
-
+
+ def adapt_to_selectable(self, query, sel):
+ _ColumnEntity(query, sel.corresponding_column(self.column))
+
def setup_entity(self, entity, mapper, adapter, from_obj, is_aliased_class, with_polymorphic):
self.selectable = from_obj
self.froms.add(from_obj)
eq_(fred.union(ed, jack).order_by(User.name).all(),
[User(name='ed'), User(name='fred'), User(name='jack')]
)
+
+ def test_union_labels(self):
+ """test that column expressions translate during the _from_statement() portion of union(), others"""
+
+ s = create_session()
+ q1 = s.query(User, literal("x"))
+ q2 = s.query(User, literal_column("'y'"))
+ q3 = q1.union(q2)
+
+ self.assert_compile(
+ q3,
+ "SELECT anon_1.id AS anon_1_id, anon_1.name AS anon_1_name, anon_1.anon_2 AS anon_1_anon_2 FROM "
+ "(SELECT users.id AS id, users.name AS name, :param_1 AS anon_2 FROM users "
+ "UNION SELECT users.id AS id, users.name AS name, 'y' FROM users) AS anon_1"
+ , use_default_dialect = True
+ )
+
+ q4 = s.query(User, literal_column("'x'").label('foo'))
+ q5 = s.query(User, literal("y"))
+ q6 = q4.union(q5)
+
+ for q in (q3, q6):
+ eq_(q.all(),
+ [
+ (User(id=7, name=u'jack'), u'x'),
+ (User(id=7, name=u'jack'), u'y'),
+ (User(id=8, name=u'ed'), u'x'),
+ (User(id=8, name=u'ed'), u'y'),
+ (User(id=9, name=u'fred'), u'x'),
+ (User(id=9, name=u'fred'), u'y'),
+ (User(id=10, name=u'chuck'), u'x'),
+ (User(id=10, name=u'chuck'), u'y')
+ ]
+ )
@testing.fails_on('mysql', "mysql doesn't support intersect")
def test_intersect(self):
self.assert_sql_count(testing.db, go, 1)
-class MixedEntitiesTest(QueryTest):
+class MixedEntitiesTest(QueryTest, AssertsCompiledSQL):
def test_values(self):
sess = create_session()
for q in [
sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id).order_by(Order.id, oalias.id),
- sess.query(Order, oalias)._from_self().filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id).order_by(Order.id, oalias.id),
+ sess.query(Order, oalias).from_self().filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id).order_by(Order.id, oalias.id),
# same thing, but reversed.
- sess.query(oalias, Order)._from_self().filter(oalias.user_id==Order.user_id).filter(oalias.user_id==7).filter(Order.id<oalias.id).order_by(oalias.id, Order.id),
+ sess.query(oalias, Order).from_self().filter(oalias.user_id==Order.user_id).filter(oalias.user_id==7).filter(Order.id<oalias.id).order_by(oalias.id, Order.id),
# here we go....two layers of aliasing
- sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id)._from_self().order_by(Order.id, oalias.id).limit(10).options(eagerload(Order.items)),
+ sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id).from_self().order_by(Order.id, oalias.id).limit(10).options(eagerload(Order.items)),
# gratuitous four layers
- sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id)._from_self()._from_self()._from_self().order_by(Order.id, oalias.id).limit(10).options(eagerload(Order.items)),
+ sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id).from_self().from_self().from_self().order_by(Order.id, oalias.id).limit(10).options(eagerload(Order.items)),
]:
(Order(address_id=None,description=u'order 5',isopen=0,user_id=7,id=5), Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3))
]
)
+
+
+ # ensure column expressions are taken from inside the subquery, not restated at the top
+ q = sess.query(Order.id, Order.description, literal_column("'q'").label('foo')).filter(Order.description == u'order 3').from_self()
+ self.assert_compile(q,
+ "SELECT anon_1.orders_id AS anon_1_orders_id, anon_1.orders_description AS anon_1_orders_description, "
+ "anon_1.foo AS anon_1_foo FROM (SELECT orders.id AS orders_id, orders.description AS orders_description, "
+ "'q' AS foo FROM orders WHERE orders.description = :description_1) AS anon_1", use_default_dialect=True)
+ eq_(
+ q.all(),
+ [(3, u'order 3', 'q')]
+ )
+
def test_multi_mappers(self):
sess.query(Node, parent, grandparent).\
join((Node.parent, parent), (parent.parent, grandparent)).\
filter(Node.data=='n122').filter(parent.data=='n12').\
- filter(grandparent.data=='n1')._from_self().first(),
+ filter(grandparent.data=='n1').from_self().first(),
(Node(data='n122'), Node(data='n12'), Node(data='n1'))
)
sess.query(parent, grandparent, Node).\
join((Node.parent, parent), (parent.parent, grandparent)).\
filter(Node.data=='n122').filter(parent.data=='n12').\
- filter(grandparent.data=='n1')._from_self().first(),
+ filter(grandparent.data=='n1').from_self().first(),
(Node(data='n12'), Node(data='n1'), Node(data='n122'))
)
sess.query(Node, parent, grandparent).\
join((Node.parent, parent), (parent.parent, grandparent)).\
filter(Node.data=='n122').filter(parent.data=='n12').\
- filter(grandparent.data=='n1')._from_self().\
+ filter(grandparent.data=='n1').from_self().\
options(eagerload(Node.children)).first(),
(Node(data='n122'), Node(data='n12'), Node(data='n1'))
)