>>> from sqlalchemy.orm import eagerload
{sql}>>> jack = session.query(User).options(eagerload('addresses')).filter_by(name='jack').one() #doctest: +NORMALIZE_WHITESPACE
- SELECT addresses_1.id AS addresses_1_id, addresses_1.email_address AS addresses_1_email_address,
- addresses_1.user_id AS addresses_1_user_id, users.id AS users_id, users.name AS users_name,
- users.fullname AS users_fullname, users.password AS users_password
- FROM (SELECT users.id AS users_id, users.oid AS users_oid
+ SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS anon_1_users_name,
+ anon_1.users_fullname AS anon_1_users_fullname, anon_1.users_password AS anon_1_users_password,
+ addresses_2.id AS addresses_2_id, addresses_2.email_address AS addresses_2_email_address,
+ addresses_2.user_id AS addresses_2_user_id
+ FROM (SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname,
+ users.password AS users_password, users.oid AS users_oid
FROM users
- WHERE users.name = ? ORDER BY users.oid LIMIT 2 OFFSET 0) AS tbl_row_count,
- users LEFT OUTER JOIN addresses AS addresses_1 ON users.id = addresses_1.user_id
- WHERE users.id = tbl_row_count.users_id ORDER BY tbl_row_count.oid, addresses_1.oid
- ['jack']
+ WHERE users.name = ? ORDER BY users.oid
+ LIMIT 2 OFFSET 0) AS anon_1 LEFT OUTER JOIN addresses AS addresses_2
+ ON anon_1.users_id = addresses_2.user_id ORDER BY anon_1.oid, addresses_2.oid
+ ['jack']
>>> jack
<User('jack','Jack Bean', 'gjffdd')>
{python}
>>> s = select([addresses.c.user_id, func.count(addresses.c.id)]).\
- ... group_by(addresses.c.user_id).having(func.count(addresses.c.id>1))
+ ... group_by(addresses.c.user_id).having(func.count(addresses.c.id)>1)
{opensql}>>> print conn.execute(s).fetchall()
SELECT addresses.user_id, count(addresses.id)
FROM addresses GROUP BY addresses.user_id
- HAVING count(addresses.id > ?)
+ HAVING count(addresses.id) > ?
[1]
{stop}[(1, 2), (2, 2)]
part of the "deep" copy afforded by a traversal that combines
the _copy_internals() method.
"""
-
c = self.__class__.__new__(self.__class__)
c.__dict__ = self.__dict__.copy()
return c
return self.selectable.columns
def _clone(self):
- # Alias is immutable
- return self
+ # TODO: need test coverage to assert ClauseAdapter behavior
+ # here; must identify non-ORM failure cases when a. _clone() returns 'self' in all
+ # cases and b. when _clone() does an actual _clone() in all cases.
+ if isinstance(self.selectable, TableClause):
+ return self
+ else:
+ return super(Alias, self)._clone()
def _copy_internals(self, clone=_clone):
- pass
+ self._clone_from_clause()
+ self.selectable = _clone(self.selectable)
+ baseselectable = self.selectable
+ while isinstance(baseselectable, Alias):
+ baseselectable = baseselectable.selectable
+ self.original = baseselectable
def get_children(self, **kwargs):
for c in self.c:
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 WHERE t1alias.col1 = table2.col2")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = table2.col2")
-
+
+ s = select(['*'], from_obj=[t1]).alias('foo')
+ self.assert_compile(s.select(), "SELECT foo.* FROM (SELECT * FROM table1) AS foo")
+ self.assert_compile(vis.traverse(s.select(), clone=True), "SELECT foo.* FROM (SELECT * FROM table1 AS t1alias) AS foo")
+ self.assert_compile(s.select(), "SELECT foo.* FROM (SELECT * FROM table1) AS foo")
+
ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True)
self.assert_compile(ff, "count(t1alias.col1) AS foo")
assert ff._get_from_objects() == [t1alias]
# TODO:
-# self.assert_compile(vis.traverse(select([func.count(t1.c.col1).label('foo')]), clone=True), "SELECT count(t1alias.col1) AS foo FROM table1 AS t1alias")
+ # self.assert_compile(vis.traverse(select([func.count(t1.c.col1).label('foo')]), clone=True), "SELECT count(t1alias.col1) AS foo FROM table1 AS t1alias")
t2alias = t2.alias('t2alias')
vis.chain(sql_util.ClauseAdapter(t2alias))
assert x == y == z == w == q == r
+
def test_column_order_with_simple_query(self):
# should return values in column definition order
users.insert().execute(user_id=1, user_name='foo')
assert False
except AttributeError:
assert True
+
+ def test_functions_with_cols(self):
+ from sqlalchemy.sql import column
+ users = table('users', column('id'), column('name'), column('fullname'))
+ calculate = select([column('q'), column('z'), column('r')],
+ from_obj=[func.calculate(bindparam('x'), bindparam('y'))])
+ self.assert_compile(select([users], users.c.id > calculate.c.z),
+ "SELECT users.id, users.name, users.fullname "
+ "FROM users, (SELECT q, z, r "
+ "FROM calculate(:x, :y)) "
+ "WHERE users.id > z"
+ )
+
+ print "--------------------------------------------------"
+ s = select([users], users.c.id.between(
+ calculate.alias('c1').unique_params(x=17, y=45).c.z,
+ calculate.alias('c2').unique_params(x=5, y=12).c.z))
+
+ self.assert_compile(s,
+ "SELECT users.id, users.name, users.fullname "
+ "FROM users, (SELECT q, z, r "
+ "FROM calculate(:x, :y)) AS c1, (SELECT q, z, r "
+ "FROM calculate(:x_1, :y_1)) AS c2 "
+ "WHERE users.id BETWEEN c1.z AND c2.z"
+ , checkparams={'y': 45, 'x': 17, 'y_1': 12, 'x_1': 5})
+
def testextract(self):
"""test the EXTRACT function"""
self.assert_compile(select([extract("month", table3.c.otherstuff)]), "SELECT extract(month FROM thirdtable.otherstuff) FROM thirdtable")