From: Mike Bayer Date: Sat, 18 Sep 2010 17:18:44 +0000 (-0400) Subject: - An informative error message is raised if a Column X-Git-Tag: rel_0_6_5~58 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=003149c504149849c679a60a1f346a0f0393dce0;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - An informative error message is raised if a Column which has not yet been assigned a name, i.e. as in declarative, is used in a context where it is exported to the columns collection of an enclosing select() construct, or if any construct involving that column is compiled before its name is assigned. [ticket:1862] --- diff --git a/CHANGES b/CHANGES index 1cf7a39f62..87ca1c63e5 100644 --- a/CHANGES +++ b/CHANGES @@ -91,6 +91,14 @@ CHANGES - Table.tometadata() issues a warning if the given Table is already present in the target MetaData - the existing Table object is returned. + + - An informative error message is raised if a Column + which has not yet been assigned a name, i.e. as in + declarative, is used in a context where it is + exported to the columns collection of an enclosing + select() construct, or if any construct involving + that column is compiled before its name is + assigned. [ticket:1862] - engine diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py index 4b7916e98f..59f9fae7f1 100644 --- a/lib/sqlalchemy/schema.py +++ b/lib/sqlalchemy/schema.py @@ -922,6 +922,10 @@ class Column(SchemaItem, expression.ColumnClause): """ fk = [ForeignKey(f.column) for f in self.foreign_keys] + if name is None and self.name is None: + raise exc.InvalidRequestError("Cannot initialize a sub-selectable" + " with this Column object until it's 'name' has " + "been assigned.") c = self._constructor( name or self.name, self.type, diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index fcff5e3550..e47db7e283 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -332,6 +332,10 @@ class SQLCompiler(engine.Compiled): def visit_column(self, column, result_map=None, **kwargs): name = column.name + if name is None: + raise exc.CompileError("Cannot compile Column object until " + "it's 'name' is assigned.") + if not column.is_literal and isinstance(name, sql._generated_label): name = self._truncated_identifier("colident", name) diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py index 1b1cfee8a7..70d5f13fc0 100644 --- a/lib/sqlalchemy/sql/expression.py +++ b/lib/sqlalchemy/sql/expression.py @@ -1852,17 +1852,19 @@ class ColumnElement(ClauseElement, _CompareMixin): descending selectable. """ - - if name: - co = ColumnClause(name, selectable, type_=getattr(self, - 'type', None)) + if name is None: + name = self.anon_label + # TODO: may want to change this to anon_label, + # or some value that is more useful than the + # compiled form of the expression + key = str(self) else: - name = str(self) - co = ColumnClause(self.anon_label, selectable, - type_=getattr(self, 'type', None)) - + key = name + + co = ColumnClause(name, selectable, type_=getattr(self, + 'type', None)) co.proxies = [self] - selectable.columns[name] = co + selectable.columns[key] = co return co def compare(self, other, use_proxies=False, equivalents=None, **kw): diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index b7e5d09531..355e5dc70c 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -89,7 +89,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): def test_invalid_col_argument(self): assert_raises(exc.ArgumentError, select, table1) assert_raises(exc.ArgumentError, select, table1.c.myid) - + def test_from_subquery(self): """tests placing select statements in the column clause of another select, for the purposes of selecting from the exported columns of that select.""" @@ -263,13 +263,27 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A s3 = select([s2], use_labels=True) s4 = s3.alias() s5 = select([s4], use_labels=True) - self.assert_compile(s5, "SELECT anon_1.anon_2_myid AS anon_1_anon_2_myid, anon_1.anon_2_name AS anon_1_anon_2_name, "\ - "anon_1.anon_2_description AS anon_1_anon_2_description FROM (SELECT anon_2.myid AS anon_2_myid, anon_2.name AS anon_2_name, "\ - "anon_2.description AS anon_2_description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description "\ - "AS description FROM mytable) AS anon_2) AS anon_1") + self.assert_compile(s5, + 'SELECT anon_1.anon_2_myid AS ' + 'anon_1_anon_2_myid, anon_1.anon_2_name AS ' + 'anon_1_anon_2_name, anon_1.anon_2_descript' + 'ion AS anon_1_anon_2_description FROM ' + '(SELECT anon_2.myid AS anon_2_myid, ' + 'anon_2.name AS anon_2_name, ' + 'anon_2.description AS anon_2_description ' + 'FROM (SELECT mytable.myid AS myid, ' + 'mytable.name AS name, mytable.description ' + 'AS description FROM mytable) AS anon_2) ' + 'AS anon_1') def test_dont_overcorrelate(self): - self.assert_compile(select([table1], from_obj=[table1, table1.select()]), """SELECT mytable.myid, mytable.name, mytable.description FROM mytable, (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)""") + self.assert_compile(select([table1], from_obj=[table1, + table1.select()]), + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable, (SELECT " + "mytable.myid AS myid, mytable.name AS " + "name, mytable.description AS description " + "FROM mytable)") def test_full_correlate(self): # intentional @@ -301,31 +315,56 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A "EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)" ) - self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)", params={'mytable_myid':5}) - - self.assert_compile(select([table1, exists([1], from_obj=table2)]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={}) - - self.assert_compile(select([table1, exists([1], from_obj=table2).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={}) - - self.assert_compile( - table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = mytable.myid)" - ) - - self.assert_compile( - table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = mytable.myid)" - ) - - self.assert_compile( - table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)).replace_selectable(table2, table2.alias()), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT * FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)" - ) - - self.assert_compile( - table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).replace_selectable(table2, table2.alias()), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable JOIN myothertable AS myothertable_1 ON mytable.myid = myothertable_1.otherid WHERE EXISTS (SELECT * FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)" - ) + self.assert_compile(exists([table1.c.myid], table1.c.myid + == 5).select(), + 'SELECT EXISTS (SELECT mytable.myid FROM ' + 'mytable WHERE mytable.myid = :myid_1)', + params={'mytable_myid': 5}) + self.assert_compile(select([table1, exists([1], + from_obj=table2)]), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description, EXISTS (SELECT 1 ' + 'FROM myothertable) FROM mytable', + params={}) + self.assert_compile(select([table1, exists([1], + from_obj=table2).label('foo')]), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description, EXISTS (SELECT 1 ' + 'FROM myothertable) AS foo FROM mytable', + params={}) + + self.assert_compile(table1.select(exists().where(table2.c.otherid + == table1.c.myid).correlate(table1)), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable WHERE ' + 'EXISTS (SELECT * FROM myothertable WHERE ' + 'myothertable.otherid = mytable.myid)') + self.assert_compile(table1.select(exists().where(table2.c.otherid + == table1.c.myid).correlate(table1)), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable WHERE ' + 'EXISTS (SELECT * FROM myothertable WHERE ' + 'myothertable.otherid = mytable.myid)') + self.assert_compile(table1.select(exists().where(table2.c.otherid + == table1.c.myid).correlate(table1)).replace_selectable(table2, + table2.alias()), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable WHERE ' + 'EXISTS (SELECT * FROM myothertable AS ' + 'myothertable_1 WHERE myothertable_1.otheri' + 'd = mytable.myid)') + self.assert_compile(table1.select(exists().where(table2.c.otherid + == table1.c.myid).correlate(table1)).select_from(table1.join(table2, + table1.c.myid + == table2.c.otherid)).replace_selectable(table2, + table2.alias()), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable JOIN ' + 'myothertable AS myothertable_1 ON ' + 'mytable.myid = myothertable_1.otherid ' + 'WHERE EXISTS (SELECT * FROM myothertable ' + 'AS myothertable_1 WHERE ' + 'myothertable_1.otherid = mytable.myid)') self.assert_compile( select([ @@ -334,62 +373,93 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A exists().where(table2.c.otherid=='bar') ) ]), - "SELECT (EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = :otherid_1)) "\ - "OR (EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = :otherid_2)) AS anon_1" + "SELECT (EXISTS (SELECT * FROM myothertable " + "WHERE myothertable.otherid = :otherid_1)) " + "OR (EXISTS (SELECT * FROM myothertable WHERE " + "myothertable.otherid = :otherid_2)) AS anon_1" ) def test_where_subquery(self): - s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s') - self.assert_compile( - select([users, s.c.street], from_obj=s), - """SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""") - - self.assert_compile( - table1.select(table1.c.myid == select([table1.c.myid], table1.c.name=='jack')), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.name = :name_1)" - ) - - self.assert_compile( - table1.select(table1.c.myid == select([table2.c.otherid], table1.c.name == table2.c.othername)), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT myothertable.otherid FROM myothertable WHERE mytable.name = myothertable.othername)" - ) - - self.assert_compile( - table1.select(exists([1], table2.c.otherid == table1.c.myid)), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)" - ) - - + s = select([addresses.c.street], addresses.c.user_id + == users.c.user_id, correlate=True).alias('s') + self.assert_compile(select([users, s.c.street], from_obj=s), + "SELECT users.user_id, users.user_name, " + "users.password, s.street FROM users, " + "(SELECT addresses.street AS street FROM " + "addresses WHERE addresses.user_id = " + "users.user_id) AS s") + self.assert_compile(table1.select(table1.c.myid + == select([table1.c.myid], table1.c.name + == 'jack')), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable WHERE ' + 'mytable.myid = (SELECT mytable.myid FROM ' + 'mytable WHERE mytable.name = :name_1)') + self.assert_compile(table1.select(table1.c.myid + == select([table2.c.otherid], table1.c.name + == table2.c.othername)), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable WHERE ' + 'mytable.myid = (SELECT ' + 'myothertable.otherid FROM myothertable ' + 'WHERE mytable.name = myothertable.othernam' + 'e)') + self.assert_compile(table1.select(exists([1], table2.c.otherid + == table1.c.myid)), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable WHERE ' + 'EXISTS (SELECT 1 FROM myothertable WHERE ' + 'myothertable.otherid = mytable.myid)') talias = table1.alias('ta') - s = subquery('sq2', [talias], exists([1], table2.c.otherid == talias.c.myid)) - self.assert_compile( - select([s, table1]) - ,"SELECT sq2.myid, sq2.name, sq2.description, mytable.myid, mytable.name, mytable.description FROM (SELECT ta.myid AS myid, ta.name AS name, ta.description AS description FROM mytable AS ta WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = ta.myid)) AS sq2, mytable") - - s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s') - self.assert_compile( - select([users, s.c.street], from_obj=s), - """SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""") - - # test constructing the outer query via append_column(), which occurs in the ORM's Query object - s = select([], exists([1], table2.c.otherid==table1.c.myid), from_obj=table1) + s = subquery('sq2', [talias], exists([1], table2.c.otherid + == talias.c.myid)) + self.assert_compile(select([s, table1]), + 'SELECT sq2.myid, sq2.name, ' + 'sq2.description, mytable.myid, ' + 'mytable.name, mytable.description FROM ' + '(SELECT ta.myid AS myid, ta.name AS name, ' + 'ta.description AS description FROM ' + 'mytable AS ta WHERE EXISTS (SELECT 1 FROM ' + 'myothertable WHERE myothertable.otherid = ' + 'ta.myid)) AS sq2, mytable') + s = select([addresses.c.street], addresses.c.user_id + == users.c.user_id, correlate=True).alias('s') + self.assert_compile(select([users, s.c.street], from_obj=s), + "SELECT users.user_id, users.user_name, " + "users.password, s.street FROM users, " + "(SELECT addresses.street AS street FROM " + "addresses WHERE addresses.user_id = " + "users.user_id) AS s") + + # test constructing the outer query via append_column(), which + # occurs in the ORM's Query object + + s = select([], exists([1], table2.c.otherid == table1.c.myid), + from_obj=table1) s.append_column(table1) - self.assert_compile( - s, - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)" - ) + self.assert_compile(s, + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable WHERE ' + 'EXISTS (SELECT 1 FROM myothertable WHERE ' + 'myothertable.otherid = mytable.myid)') def test_orderby_subquery(self): - self.assert_compile( - table1.select(order_by=[select([table2.c.otherid], table1.c.myid==table2.c.otherid)]), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable ORDER BY (SELECT myothertable.otherid FROM myothertable WHERE mytable.myid = myothertable.otherid)" - ) - self.assert_compile( - table1.select(order_by=[desc(select([table2.c.otherid], table1.c.myid==table2.c.otherid))]), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable ORDER BY (SELECT myothertable.otherid FROM myothertable WHERE mytable.myid = myothertable.otherid) DESC" - ) + self.assert_compile(table1.select(order_by=[select([table2.c.otherid], + table1.c.myid == table2.c.otherid)]), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable ORDER BY ' + '(SELECT myothertable.otherid FROM ' + 'myothertable WHERE mytable.myid = ' + 'myothertable.otherid)') + self.assert_compile(table1.select(order_by=[desc(select([table2.c.otherid], + table1.c.myid == table2.c.otherid))]), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable ORDER BY ' + '(SELECT myothertable.otherid FROM ' + 'myothertable WHERE mytable.myid = ' + 'myothertable.otherid) DESC') @testing.uses_deprecated('scalar option') def test_scalar_select(self): @@ -401,41 +471,76 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A ) s = select([table1.c.myid], correlate=False).as_scalar() - self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable") - + self.assert_compile(select([table1, s]), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description, (SELECT mytable.myid ' + 'FROM mytable) AS anon_1 FROM mytable') s = select([table1.c.myid]).as_scalar() - self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable") - + self.assert_compile(select([table2, s]), + 'SELECT myothertable.otherid, ' + 'myothertable.othername, (SELECT ' + 'mytable.myid FROM mytable) AS anon_1 FROM ' + 'myothertable') s = select([table1.c.myid]).correlate(None).as_scalar() - self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable") - - # test that aliases use as_scalar() when used in an explicitly scalar context - s = select([table1.c.myid]).alias() - self.assert_compile(select([table1.c.myid]).where(table1.c.myid==s), "SELECT mytable.myid FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable)") - self.assert_compile(select([table1.c.myid]).where(s > table1.c.myid), "SELECT mytable.myid FROM mytable WHERE mytable.myid < (SELECT mytable.myid FROM mytable)") + self.assert_compile(select([table1, s]), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description, (SELECT mytable.myid ' + 'FROM mytable) AS anon_1 FROM mytable') + # test that aliases use as_scalar() when used in an explicitly + # scalar context + s = select([table1.c.myid]).alias() + self.assert_compile(select([table1.c.myid]).where(table1.c.myid + == s), + 'SELECT mytable.myid FROM mytable WHERE ' + 'mytable.myid = (SELECT mytable.myid FROM ' + 'mytable)') + self.assert_compile(select([table1.c.myid]).where(s + > table1.c.myid), + 'SELECT mytable.myid FROM mytable WHERE ' + 'mytable.myid < (SELECT mytable.myid FROM ' + 'mytable)') s = select([table1.c.myid]).as_scalar() - self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable") + self.assert_compile(select([table2, s]), + 'SELECT myothertable.otherid, ' + 'myothertable.othername, (SELECT ' + 'mytable.myid FROM mytable) AS anon_1 FROM ' + 'myothertable') # test expressions against scalar selects - self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :param_1 AS anon_1") - self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :param_1 AS anon_1") - self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :param_1 AS anon_1") - self.assert_compile(select([select([table1.c.name]).label('foo')]), "SELECT (SELECT mytable.name FROM mytable) AS foo") + self.assert_compile(select([s - literal(8)]), + 'SELECT (SELECT mytable.myid FROM mytable) ' + '- :param_1 AS anon_1') + self.assert_compile(select([select([table1.c.name]).as_scalar() + + literal('x')]), + 'SELECT (SELECT mytable.name FROM mytable) ' + '|| :param_1 AS anon_1') + self.assert_compile(select([s > literal(8)]), + 'SELECT (SELECT mytable.myid FROM mytable) ' + '> :param_1 AS anon_1') + self.assert_compile(select([select([table1.c.name]).label('foo' + )]), + 'SELECT (SELECT mytable.name FROM mytable) ' + 'AS foo') + + # scalar selects should not have any attributes on their 'c' or + # 'columns' attribute - # scalar selects should not have any attributes on their 'c' or 'columns' attribute s = select([table1.c.myid]).as_scalar() try: s.c.foo except exc.InvalidRequestError, err: - assert str(err) == 'Scalar Select expression has no columns; use this object directly within a column-level expression.' - + assert str(err) \ + == 'Scalar Select expression has no columns; use this '\ + 'object directly within a column-level expression.' try: s.columns.foo except exc.InvalidRequestError, err: - assert str(err) == 'Scalar Select expression has no columns; use this object directly within a column-level expression.' + assert str(err) \ + == 'Scalar Select expression has no columns; use this '\ + 'object directly within a column-level expression.' zips = table('zips', column('zipcode'), @@ -455,29 +560,55 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A order_by = ['dist', places.c.nm] ) - self.assert_compile(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE " - "zips.zipcode = :zipcode_1), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zipcode_2)) AS dist " - "FROM places, zips WHERE zips.zipcode = :zipcode_3 ORDER BY dist, places.nm") + self.assert_compile(q, + 'SELECT places.id, places.nm, ' + 'zips.zipcode, latlondist((SELECT ' + 'zips.latitude FROM zips WHERE ' + 'zips.zipcode = :zipcode_1), (SELECT ' + 'zips.longitude FROM zips WHERE ' + 'zips.zipcode = :zipcode_2)) AS dist FROM ' + 'places, zips WHERE zips.zipcode = ' + ':zipcode_3 ORDER BY dist, places.nm') zalias = zips.alias('main_zip') qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode).as_scalar() qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode).as_scalar() - q = select([places.c.id, places.c.nm, zalias.c.zipcode, func.latlondist(qlat, qlng).label('dist')], - order_by = ['dist', places.c.nm] - ) - self.assert_compile(q, "SELECT places.id, places.nm, main_zip.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE zips.zipcode = main_zip.zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = main_zip.zipcode)) AS dist FROM places, zips AS main_zip ORDER BY dist, places.nm") + q = select([places.c.id, places.c.nm, zalias.c.zipcode, + func.latlondist(qlat, qlng).label('dist')], + order_by=['dist', places.c.nm]) + self.assert_compile(q, + 'SELECT places.id, places.nm, ' + 'main_zip.zipcode, latlondist((SELECT ' + 'zips.latitude FROM zips WHERE ' + 'zips.zipcode = main_zip.zipcode), (SELECT ' + 'zips.longitude FROM zips WHERE ' + 'zips.zipcode = main_zip.zipcode)) AS dist ' + 'FROM places, zips AS main_zip ORDER BY ' + 'dist, places.nm') a1 = table2.alias('t2alias') s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid).as_scalar() j1 = table1.join(table2, table1.c.myid==table2.c.otherid) s2 = select([table1, s1], from_obj=j1) - self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) AS anon_1 FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid") + self.assert_compile(s2, + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description, (SELECT ' + 't2alias.otherid FROM myothertable AS ' + 't2alias WHERE mytable.myid = ' + 't2alias.otherid) AS anon_1 FROM mytable ' + 'JOIN myothertable ON mytable.myid = ' + 'myothertable.otherid') def test_label_comparison(self): x = func.lala(table1.c.myid).label('foo') - self.assert_compile(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :param_1") + self.assert_compile(select([x], x == 5), + 'SELECT lala(mytable.myid) AS foo FROM ' + 'mytable WHERE lala(mytable.myid) = ' + ':param_1') - self.assert_compile(label('bar', column('foo', type_=String)) + "foo", "foo || :param_1") + self.assert_compile( + label('bar', column('foo', type_=String))+ 'foo', + 'foo || :param_1') def test_conjunctions(self): @@ -491,7 +622,8 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A ) self.assert_compile( - and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()"), + and_(table1.c.myid == 12, table1.c.name=='asdf', + table2.c.othername == 'foo', "sysdate() = today()"), "mytable.myid = :myid_1 AND mytable.name = :name_1 "\ "AND myothertable.othername = :othername_1 AND sysdate() = today()" ) @@ -499,11 +631,14 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A self.assert_compile( and_( table1.c.myid == 12, - or_(table2.c.othername=='asdf', table2.c.othername == 'foo', table2.c.otherid == 9), + or_(table2.c.othername=='asdf', + table2.c.othername == 'foo', table2.c.otherid == 9), "sysdate() = today()", ), - "mytable.myid = :myid_1 AND (myothertable.othername = :othername_1 OR "\ - "myothertable.othername = :othername_2 OR myothertable.otherid = :otherid_1) AND sysdate() = today()", + 'mytable.myid = :myid_1 AND (myothertable.othername = ' + ':othername_1 OR myothertable.othername = :othername_2 OR ' + 'myothertable.otherid = :otherid_1) AND sysdate() = ' + 'today()', checkparams = {'othername_1': 'asdf', 'othername_2':'foo', 'otherid_1': 9, 'myid_1': 12} ) @@ -1763,18 +1898,65 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A self.assert_compile(table.select(between((table.c.field == table.c.field), False, True)), "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2") + def test_delayed_col_naming(self): + my_str = Column(String) + + sel1 = select([my_str]) + + assert_raises_message( + exc.InvalidRequestError, + "Cannot initialize a sub-selectable with this Column", + lambda: sel1.c + ) + + assert_raises_message( + exc.CompileError, + "Cannot compile Column object until it's 'name' is assigned.", + lambda: select([func.substr(my_str, 2, 3)]).label('my_substr') + ) + + assert_raises_message( + exc.InvalidRequestError, + "Cannot initialize a sub-selectable with this Column", + lambda: select([my_str]).as_scalar() + ) + + + my_str.name = 'foo' + + self.assert_compile( + sel1, + "SELECT foo", + ) + self.assert_compile( + select([func.substr(my_str, 2, 3)]).label('my_substr'), + '(SELECT substr(foo, :substr_2, :substr_3) AS substr_1)', + ) + def test_naming(self): - s1 = select([table1.c.myid, table1.c.myid.label('foobar'), func.hoho(table1.c.name), func.lala(table1.c.name).label('gg')]) - assert s1.c.keys() == ['myid', 'foobar', 'hoho(mytable.name)', 'gg'] + f1 = func.hoho(table1.c.name) + s1 = select([table1.c.myid, table1.c.myid.label('foobar'), + f1, + func.lala(table1.c.name).label('gg')]) + + eq_( + s1.c.keys(), + ['myid', 'foobar', str(f1), 'gg'] + ) meta = MetaData() t1 = Table('mytable', meta, Column('col1', Integer)) + exprs = ( + table1.c.myid==12, + func.hoho(table1.c.myid), + cast(table1.c.name, Numeric) + ) for col, key, expr, label in ( (table1.c.name, 'name', 'mytable.name', None), - (table1.c.myid==12, 'mytable.myid = :myid_1', 'mytable.myid = :myid_1', 'anon_1'), - (func.hoho(table1.c.myid), 'hoho(mytable.myid)', 'hoho(mytable.myid)', 'hoho_1'), - (cast(table1.c.name, Numeric), 'CAST(mytable.name AS NUMERIC)', 'CAST(mytable.name AS NUMERIC)', 'anon_1'), + (exprs[0], str(exprs[0]), 'mytable.myid = :myid_1', 'anon_1'), + (exprs[1], str(exprs[1]), 'hoho(mytable.myid)', 'hoho_1'), + (exprs[2], str(exprs[2]), 'CAST(mytable.name AS NUMERIC)', 'anon_1'), (t1.c.col1, 'col1', 'mytable.col1', None), (column('some wacky thing'), 'some wacky thing', '"some wacky thing"', '') ): diff --git a/test/sql/test_query.py b/test/sql/test_query.py index 57f434a987..a87931bb35 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -795,7 +795,10 @@ class QueryTest(TestBase): ) shadowed.create(checkfirst=True) try: - shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', _parent='Hidden parent', _row='Hidden row') + shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', + row='Without light there is no shadow', + _parent='Hidden parent', + _row='Hidden row') r = shadowed.select(shadowed.c.shadow_id==1).execute().first() self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1) self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow')