From: Mike Bayer Date: Mon, 13 Aug 2007 20:00:38 +0000 (+0000) Subject: - generalized a SQLCompileTest out of select.py, installed X-Git-Tag: rel_0_4beta2~9 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=66142098952f6c1027dbede243a25c003671dee3;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - generalized a SQLCompileTest out of select.py, installed into dialect/mssql.py, dialect/oracle.py, sql/generative.py - fixed oracle issues [ticket:732], [ticket:733], [ticket:734] --- diff --git a/lib/sqlalchemy/databases/oracle.py b/lib/sqlalchemy/databases/oracle.py index 78ceddb4ee..2c45c94e8e 100644 --- a/lib/sqlalchemy/databases/oracle.py +++ b/lib/sqlalchemy/databases/oracle.py @@ -401,7 +401,7 @@ class OracleDialect(ansisql.ANSIDialect): def _normalize_name(self, name): if name is None: return None - elif name.upper() == name and not self.identifier_preparer._requires_quotes(name.lower(), True): + elif name.upper() == name and not self.identifier_preparer._requires_quotes(name.lower()): return name.lower() else: return name @@ -409,7 +409,7 @@ class OracleDialect(ansisql.ANSIDialect): def _denormalize_name(self, name): if name is None: return None - elif name.lower() == name and not self.identifier_preparer._requires_quotes(name.lower(), True): + elif name.lower() == name and not self.identifier_preparer._requires_quotes(name.lower()): return name.upper() else: return name @@ -605,7 +605,7 @@ class OracleCompiler(ansisql.ANSICompiler): """Oracle doesn't like ``FROM table AS alias``. Is the AS standard SQL??""" if asfrom: - return self.process(alias.original, asfrom=asfrom, **kwargs) + " " + alias.name + return self.process(alias.original, asfrom=asfrom, **kwargs) + " " + self.preparer.format_alias(alias, self._anonymize(alias.name)) else: return self.process(alias.original, **kwargs) @@ -650,7 +650,7 @@ class OracleCompiler(ansisql.ANSICompiler): limitselect.append_whereclause("ora_rn<=%d" % (select._limit + select._offset)) else: limitselect.append_whereclause("ora_rn<=%d" % select._limit) - return self.process(limitselect) + return self.process(limitselect, **kwargs) else: return ansisql.ANSICompiler.visit_select(self, select, **kwargs) diff --git a/test/dialect/mssql.py b/test/dialect/mssql.py index 309a65bee2..1825141af2 100755 --- a/test/dialect/mssql.py +++ b/test/dialect/mssql.py @@ -4,31 +4,22 @@ from sqlalchemy import * from sqlalchemy.databases import mssql from testlib import * -msdialect = mssql.MSSQLDialect() - # TODO: migrate all MS-SQL tests here -class CompileTest(AssertMixin): - def _test(self, statement, expected, **params): - if len(params): - res = str(statement.compile(dialect=msdialect, parameters=params)) - else: - res = str(statement.compile(dialect=msdialect)) - res = re.sub(r'\n', '', res) - - assert res == expected, res +class CompileTest(SQLCompileTest): + __dialect__ = mssql.MSSQLDialect() def test_insert(self): t = table('sometable', column('somecolumn')) - self._test(t.insert(), "INSERT INTO sometable (somecolumn) VALUES (:somecolumn)") + self.assert_compile(t.insert(), "INSERT INTO sometable (somecolumn) VALUES (:somecolumn)") def test_update(self): t = table('sometable', column('somecolumn')) - self._test(t.update(t.c.somecolumn==7), "UPDATE sometable SET somecolumn=:somecolumn WHERE sometable.somecolumn = :sometable_somecolumn", somecolumn=10) + self.assert_compile(t.update(t.c.somecolumn==7), "UPDATE sometable SET somecolumn=:somecolumn WHERE sometable.somecolumn = :sometable_somecolumn", dict(somecolumn=10)) def test_count(self): t = table('sometable', column('somecolumn')) - self._test(t.count(), "SELECT count(sometable.somecolumn) AS tbl_row_count FROM sometable") + self.assert_compile(t.count(), "SELECT count(sometable.somecolumn) AS tbl_row_count FROM sometable") def test_union(self): t1 = table('t1', @@ -48,9 +39,9 @@ class CompileTest(AssertMixin): select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], t2.c.col2.in_("t2col2r2", "t2col2r3")) ) u = union(s1, s2, order_by=['col3', 'col4']) - self._test(u, "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2, :t1_col2_1) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2, :t2_col2_1) ORDER BY col3, col4") + self.assert_compile(u, "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2, :t1_col2_1) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2, :t2_col2_1) ORDER BY col3, col4") - self._test(u.alias('bar').select(), "SELECT bar.col3, bar.col4 FROM (SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2, :t1_col2_1) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2, :t2_col2_1)) AS bar") + self.assert_compile(u.alias('bar').select(), "SELECT bar.col3, bar.col4 FROM (SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2, :t1_col2_1) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2, :t2_col2_1)) AS bar") if __name__ == "__main__": testbase.main() diff --git a/test/dialect/oracle.py b/test/dialect/oracle.py index 14de8960b4..9f194bf070 100644 --- a/test/dialect/oracle.py +++ b/test/dialect/oracle.py @@ -1,6 +1,6 @@ import testbase from sqlalchemy import * -from sqlalchemy.databases import mysql +from sqlalchemy.databases import oracle from testlib import * @@ -28,5 +28,96 @@ create or replace procedure foo(x_in IN number, x_out OUT number, y_out OUT numb def tearDownAll(self): testbase.db.execute("DROP PROCEDURE foo") + +class CompileTest(SQLCompileTest): + __dialect__ = oracle.OracleDialect() + + def test_subquery(self): + t = table('sometable', column('col1'), column('col2')) + s = select([t]) + s = select([s.c.col1, s.c.col2]) + + self.assert_compile(s, "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 FROM sometable)") + + def test_limit(self): + t = table('sometable', column('col1'), column('col2')) + + s = select([t]).limit(10).offset(20) + + self.assert_compile(s, "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2, " + "ROW_NUMBER() OVER (ORDER BY sometable.rowid) AS ora_rn FROM sometable) WHERE ora_rn>20 AND ora_rn<=30" + ) + + s = select([s.c.col1, s.c.col2]) + + self.assert_compile(s, "SELECT col1, col2 FROM (SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, " + "sometable.col2 AS col2, ROW_NUMBER() OVER (ORDER BY sometable.rowid) AS ora_rn FROM sometable) WHERE ora_rn>20 AND ora_rn<=30)") + + # testing this twice to ensure oracle doesn't modify the original statement + self.assert_compile(s, "SELECT col1, col2 FROM (SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, " + "sometable.col2 AS col2, ROW_NUMBER() OVER (ORDER BY sometable.rowid) AS ora_rn FROM sometable) WHERE ora_rn>20 AND ora_rn<=30)") + + def test_outer_join(self): + table1 = table('mytable', + column('myid', Integer), + column('name', String), + column('description', String), + ) + + table2 = table( + 'myothertable', + column('otherid', Integer), + column('othername', String), + ) + + table3 = table( + 'thirdtable', + column('userid', Integer), + column('otherstuff', String), + ) + + query = select( + [table1, table2], + or_( + table1.c.name == 'fred', + table1.c.myid == 10, + table2.c.othername != 'jack', + "EXISTS (select yay from foo where boo = lar)" + ), + from_obj = [ outerjoin(table1, table2, table1.c.myid == table2.c.otherid) ] + ) + self.assert_compile(query, + "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \ +FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid(+) AND \ +(mytable.name = :mytable_name OR mytable.myid = :mytable_myid OR \ +myothertable.othername != :myothertable_othername OR EXISTS (select yay from foo where boo = lar))", + dialect=oracle.OracleDialect(use_ansi = False)) + + query = table1.outerjoin(table2, table1.c.myid==table2.c.otherid).outerjoin(table3, table3.c.userid==table2.c.otherid) + self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid LEFT OUTER JOIN thirdtable ON thirdtable.userid = myothertable.otherid") + self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable, myothertable, thirdtable WHERE mytable.myid = myothertable.otherid(+) AND thirdtable.userid(+) = myothertable.otherid", dialect=oracle.dialect(use_ansi=False)) + + def test_alias_outer_join(self): + address_types = table('address_types', + column('id'), + column('name'), + ) + addresses = table('addresses', + column('id'), + column('user_id'), + column('address_type_id'), + column('email_address') + ) + at_alias = address_types.alias() + + s = select([at_alias, addresses]).\ + select_from(addresses.outerjoin(at_alias, addresses.c.address_type_id==at_alias.c.id)).\ + where(addresses.c.user_id==7).\ + order_by(addresses.oid_column, address_types.oid_column) + self.assert_compile(s, "SELECT address_types_1.id, address_types_1.name, addresses.id, addresses.user_id, " + "addresses.address_type_id, addresses.email_address FROM addresses LEFT OUTER JOIN address_types address_types_1 " + "ON addresses.address_type_id = address_types_1.id WHERE addresses.user_id = :addresses_user_id ORDER BY addresses.rowid, " + "address_types.rowid") + if __name__ == '__main__': testbase.main() diff --git a/test/sql/generative.py b/test/sql/generative.py index 80a18d4979..d79af577f2 100644 --- a/test/sql/generative.py +++ b/test/sql/generative.py @@ -1,5 +1,4 @@ import testbase -from sql import select as selecttests from sqlalchemy import * from testlib import * @@ -132,7 +131,7 @@ class TraversalTest(AssertMixin): assert struct != s3 assert struct3 == s3 -class ClauseTest(selecttests.SQLTest): +class ClauseTest(SQLCompileTest): """test copy-in-place behavior of various ClauseElements.""" def setUpAll(self): @@ -211,7 +210,7 @@ class ClauseTest(selecttests.SQLTest): def visit_select(self, select): select.append_whereclause(t1.c.col2==7) - self.runtest(Vis().traverse(s, clone=True), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :table1_col2") + self.assert_compile(Vis().traverse(s, clone=True), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :table1_col2") def test_clause_adapter(self): from sqlalchemy import sql_util @@ -222,29 +221,29 @@ class ClauseTest(selecttests.SQLTest): ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True) assert ff._get_from_objects() == [t1alias] - self.runtest(vis.traverse(select(['*'], from_obj=[t1]), clone=True), "SELECT * FROM table1 AS t1alias") - self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2") - self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2") - self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 WHERE t1alias.col1 = table2.col2") - self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = table2.col2") + self.assert_compile(vis.traverse(select(['*'], from_obj=[t1]), clone=True), "SELECT * FROM table1 AS t1alias") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 WHERE t1alias.col1 = table2.col2") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = table2.col2") ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True) - self.runtest(ff, "count(t1alias.col1) AS foo") + self.assert_compile(ff, "count(t1alias.col1) AS foo") assert ff._get_from_objects() == [t1alias] # TODO: -# self.runtest(vis.traverse(select([func.count(t1.c.col1).label('foo')]), clone=True), "SELECT count(t1alias.col1) AS foo FROM table1 AS t1alias") +# self.assert_compile(vis.traverse(select([func.count(t1.c.col1).label('foo')]), clone=True), "SELECT count(t1alias.col1) AS foo FROM table1 AS t1alias") t2alias = t2.alias('t2alias') vis.chain(sql_util.ClauseAdapter(t2alias)) - self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2") - self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2") - self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 AS t2alias WHERE t1alias.col1 = t2alias.col2") - self.runtest(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = t2alias.col2") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 AS t2alias WHERE t1alias.col1 = t2alias.col2") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = t2alias.col2") -class SelectTest(selecttests.SQLTest): +class SelectTest(SQLCompileTest): """tests the generative capability of Select""" def setUpAll(self): @@ -261,13 +260,13 @@ class SelectTest(selecttests.SQLTest): ) def test_select(self): - self.runtest(t1.select().where(t1.c.col1==5).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1 WHERE table1.col1 = :table1_col1 ORDER BY table1.col3") + self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1 WHERE table1.col1 = :table1_col1 ORDER BY table1.col3") - self.runtest(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 WHERE table2.col1 = table1.col1) ORDER BY table1.col3") + self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 WHERE table2.col1 = table1.col1) ORDER BY table1.col3") s = select([t2], t2.c.col1==t1.c.col1, correlate=False) s = s.correlate(t1).order_by(t2.c.col3) - self.runtest(t1.select().select_from(s).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 WHERE table2.col1 = table1.col1 ORDER BY table2.col3) ORDER BY table1.col3") + self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 WHERE table2.col1 = table1.col1 ORDER BY table2.col3) ORDER BY table1.col3") if __name__ == '__main__': diff --git a/test/sql/select.py b/test/sql/select.py index 32a889b489..f5932d515a 100644 --- a/test/sql/select.py +++ b/test/sql/select.py @@ -51,19 +51,7 @@ addresses = table('addresses', column('zip') ) -class SQLTest(PersistTest): - def runtest(self, clause, result, dialect = None, params = None, checkparams = None): - c = clause.compile(parameters=params, dialect=dialect) - print "\nSQL String:\n" + str(c) + repr(c.get_params()) - cc = re.sub(r'\n', '', str(c)) - self.assert_(cc == result, "\n'" + cc + "'\n does not match \n'" + result + "'") - if checkparams is not None: - if isinstance(checkparams, list): - self.assert_(c.get_params().get_raw_list() == checkparams, "params dont match ") - else: - self.assert_(c.get_params().get_original_dict() == checkparams, "params dont match" + repr(c.get_params())) - -class SelectTest(SQLTest): +class SelectTest(SQLCompileTest): def test_attribute_sanity(self): assert hasattr(table1, 'c') @@ -79,16 +67,16 @@ class SelectTest(SQLTest): assert not hasattr(table1.alias().c.myid, 'c') def testtableselect(self): - self.runtest(table1.select(), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable") + self.assert_compile(table1.select(), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable") - self.runtest(select([table1, table2]), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, \ + self.assert_compile(select([table1, table2]), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, \ myothertable.othername FROM mytable, myothertable") def testselectselect(self): """tests placing select statements in the column clause of another select, for the purposes of selecting from the exported columns of that select.""" s = select([table1], table1.c.name == 'jack') - self.runtest( + self.assert_compile( select( [s], s.c.myid == 7 @@ -97,7 +85,7 @@ myothertable.othername FROM mytable, myothertable") "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable WHERE mytable.name = :mytable_name) WHERE myid = :myid") sq = select([table1]) - self.runtest( + self.assert_compile( sq.select(), "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)" ) @@ -106,7 +94,7 @@ myothertable.othername FROM mytable, myothertable") [table1], ).alias('sq') - self.runtest( + self.assert_compile( sq.select(sq.c.myid == 7), "SELECT sq.myid, sq.name, sq.description FROM \ (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable) AS sq WHERE sq.myid = :sq_myid" @@ -123,7 +111,7 @@ mytable.description AS mytable_description, myothertable.otherid AS myothertable myothertable.othername AS myothertable_othername FROM mytable, myothertable \ WHERE mytable.myid = :mytable_myid AND myothertable.otherid = mytable.myid" - self.runtest(sq.select(), "SELECT sq.mytable_myid, sq.mytable_name, sq.mytable_description, sq.myothertable_otherid, \ + self.assert_compile(sq.select(), "SELECT sq.mytable_myid, sq.mytable_name, sq.mytable_description, sq.myothertable_otherid, \ sq.myothertable_othername FROM (" + sqstring + ") AS sq") sq2 = select( @@ -131,7 +119,7 @@ sq.myothertable_othername FROM (" + sqstring + ") AS sq") use_labels = True ).alias('sq2') - self.runtest(sq2.select(), "SELECT sq2.sq_mytable_myid, sq2.sq_mytable_name, sq2.sq_mytable_description, \ + self.assert_compile(sq2.select(), "SELECT sq2.sq_mytable_myid, sq2.sq_mytable_name, sq2.sq_mytable_description, \ sq2.sq_myothertable_otherid, sq2.sq_myothertable_othername FROM \ (SELECT sq.mytable_myid AS sq_mytable_myid, sq.mytable_name AS sq_mytable_name, \ sq.mytable_description AS sq_mytable_description, sq.myothertable_otherid AS sq_myothertable_otherid, \ @@ -142,66 +130,66 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A dialect = mssql.dialect() q = select([table1.c.myid], order_by=[table1.c.myid]).alias('foo') crit = q.c.myid == table1.c.myid - self.runtest(select(['*'], crit), """SELECT * FROM (SELECT mytable.myid AS myid FROM mytable ORDER BY mytable.myid) AS foo, mytable WHERE foo.myid = mytable.myid""", dialect=sqlite.dialect()) - self.runtest(select(['*'], crit), """SELECT * FROM (SELECT mytable.myid AS myid FROM mytable) AS foo, mytable WHERE foo.myid = mytable.myid""", dialect=mssql.dialect()) + self.assert_compile(select(['*'], crit), """SELECT * FROM (SELECT mytable.myid AS myid FROM mytable ORDER BY mytable.myid) AS foo, mytable WHERE foo.myid = mytable.myid""", dialect=sqlite.dialect()) + self.assert_compile(select(['*'], crit), """SELECT * FROM (SELECT mytable.myid AS myid FROM mytable) AS foo, mytable WHERE foo.myid = mytable.myid""", dialect=mssql.dialect()) def testmssql_aliases_schemas(self): - self.runtest(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable") + self.assert_compile(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable") dialect = mssql.dialect() - self.runtest(table4.select(), "SELECT remotetable_1.rem_id, remotetable_1.datatype_id, remotetable_1.value FROM remote_owner.remotetable AS remotetable_1", dialect=dialect) + self.assert_compile(table4.select(), "SELECT remotetable_1.rem_id, remotetable_1.datatype_id, remotetable_1.value FROM remote_owner.remotetable AS remotetable_1", dialect=dialect) # TODO: this is probably incorrect; no "AS " is being applied to the table - self.runtest(table1.join(table4, table1.c.myid==table4.c.rem_id).select(), "SELECT mytable.myid, mytable.name, mytable.description, remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM mytable JOIN remote_owner.remotetable ON remotetable.rem_id = mytable.myid") + self.assert_compile(table1.join(table4, table1.c.myid==table4.c.rem_id).select(), "SELECT mytable.myid, mytable.name, mytable.description, remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM mytable JOIN remote_owner.remotetable ON remotetable.rem_id = mytable.myid") def testdontovercorrelate(self): - self.runtest(select([table1], from_obj=[table1, table1.select()]), """SELECT mytable.myid, mytable.name, mytable.description FROM mytable, (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)""") + self.assert_compile(select([table1], from_obj=[table1, table1.select()]), """SELECT mytable.myid, mytable.name, mytable.description FROM mytable, (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)""") def testexistsascolumnclause(self): - self.runtest(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid)", params={'mytable_myid':5}) + self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid)", params={'mytable_myid':5}) - self.runtest(select([table1, exists([1], from_obj=[table2])]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={}) + self.assert_compile(select([table1, exists([1], from_obj=[table2])]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={}) - self.runtest(select([table1, exists([1], from_obj=[table2]).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={}) + self.assert_compile(select([table1, exists([1], from_obj=[table2]).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={}) def test_generative_exists(self): - self.runtest( + self.assert_compile( table1.select(exists([1], table2.c.otherid == table1.c.myid).correlate(table1)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)" ) - self.runtest( + self.assert_compile( table1.select(exists([1]).where(table2.c.otherid == table1.c.myid).correlate(table1)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)" ) - self.runtest( + self.assert_compile( table1.select(exists([1]).where(table2.c.otherid == table1.c.myid).correlate(table1)).replace_selectable(table2, table2.alias()), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)" ) - self.runtest( + self.assert_compile( table1.select(exists([1]).where(table2.c.otherid == table1.c.myid).correlate(table1)).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).replace_selectable(table2, table2.alias()), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable JOIN myothertable AS myothertable_1 ON mytable.myid = myothertable_1.otherid WHERE EXISTS (SELECT 1 FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)" ) def testwheresubquery(self): s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s') - self.runtest( + self.assert_compile( select([users, s.c.street], from_obj=[s]), """SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""") - self.runtest( + self.assert_compile( table1.select(table1.c.myid == select([table1.c.myid], table1.c.name=='jack')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.name = :mytable_name)" ) - self.runtest( + self.assert_compile( table1.select(table1.c.myid == select([table2.c.otherid], table1.c.name == table2.c.othername)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT myothertable.otherid FROM myothertable WHERE mytable.name = myothertable.othername)" ) - self.runtest( + self.assert_compile( table1.select(exists([1], table2.c.otherid == table1.c.myid)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)" ) @@ -209,30 +197,30 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A talias = table1.alias('ta') s = subquery('sq2', [talias], exists([1], table2.c.otherid == talias.c.myid)) - self.runtest( + self.assert_compile( select([s, table1]) ,"SELECT sq2.myid, sq2.name, sq2.description, mytable.myid, mytable.name, mytable.description FROM (SELECT ta.myid AS myid, ta.name AS name, ta.description AS description FROM mytable AS ta WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = ta.myid)) AS sq2, mytable") s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s') - self.runtest( + self.assert_compile( select([users, s.c.street], from_obj=[s]), """SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""") # test constructing the outer query via append_column(), which occurs in the ORM's Query object s = select([], exists([1], table2.c.otherid==table1.c.myid), from_obj=[table1]) s.append_column(table1) - self.runtest( + self.assert_compile( s, "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)" ) def testorderbysubquery(self): - self.runtest( + self.assert_compile( table1.select(order_by=[select([table2.c.otherid], table1.c.myid==table2.c.otherid)]), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable ORDER BY (SELECT myothertable.otherid FROM myothertable WHERE mytable.myid = myothertable.otherid)" ) - self.runtest( + self.assert_compile( table1.select(order_by=[desc(select([table2.c.otherid], table1.c.myid==table2.c.otherid))]), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable ORDER BY (SELECT myothertable.otherid FROM myothertable WHERE mytable.myid = myothertable.otherid) DESC" ) @@ -240,23 +228,23 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A def test_scalar_select(self): s = select([table1.c.myid], scalar=True, correlate=False) - self.runtest(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) FROM mytable") + self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) FROM mytable") s = select([table1.c.myid], scalar=True) - self.runtest(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) FROM myothertable") + self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) FROM myothertable") s = select([table1.c.myid]).correlate(None).as_scalar() - self.runtest(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) FROM mytable") + self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) FROM mytable") s = select([table1.c.myid]).as_scalar() - self.runtest(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) FROM myothertable") + self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) FROM myothertable") # test expressions against scalar selects - self.runtest(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :literal") - self.runtest(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :literal") - self.runtest(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :literal") + self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :literal") + self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :literal") + self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :literal") - self.runtest(select([select([table1.c.name]).label('foo')]), "SELECT (SELECT mytable.name FROM mytable) AS foo") + self.assert_compile(select([select([table1.c.name]).label('foo')]), "SELECT (SELECT mytable.name FROM mytable) AS foo") # scalar selects should not have any attributes on their 'c' or 'columns' attribute s = select([table1.c.myid]).as_scalar() @@ -288,7 +276,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A order_by = ['dist', places.c.nm] ) - self.runtest(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE " + self.assert_compile(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE " "zips.zipcode = :zips_zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zips_zipcode_1)) AS dist " "FROM places, zips WHERE zips.zipcode = :zips_zipcode_2 ORDER BY dist, places.nm") @@ -298,26 +286,26 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A q = select([places.c.id, places.c.nm, zalias.c.zipcode, func.latlondist(qlat, qlng).label('dist')], order_by = ['dist', places.c.nm] ) - self.runtest(q, "SELECT places.id, places.nm, main_zip.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE zips.zipcode = main_zip.zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = main_zip.zipcode)) AS dist FROM places, zips AS main_zip ORDER BY dist, places.nm") + self.assert_compile(q, "SELECT places.id, places.nm, main_zip.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE zips.zipcode = main_zip.zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = main_zip.zipcode)) AS dist FROM places, zips AS main_zip ORDER BY dist, places.nm") a1 = table2.alias('t2alias') s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid, scalar=True) j1 = table1.join(table2, table1.c.myid==table2.c.otherid) s2 = select([table1, s1], from_obj=[j1]) - self.runtest(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid") + self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid") def testlabelcomparison(self): x = func.lala(table1.c.myid).label('foo') - self.runtest(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :literal") + self.assert_compile(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :literal") def testand(self): - self.runtest( + self.assert_compile( select(['*'], and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()")), "SELECT * FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name AND myothertable.othername = :myothertable_othername AND sysdate() = today()" ) def testor(self): - self.runtest( + self.assert_compile( select([table1], and_( table1.c.myid == 12, or_(table2.c.othername=='asdf', table2.c.othername == 'foo', table2.c.otherid == 9), @@ -328,23 +316,23 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A ) def testdistinct(self): - self.runtest( + self.assert_compile( select([table1.c.myid.distinct()]), "SELECT DISTINCT mytable.myid FROM mytable" ) - self.runtest( + self.assert_compile( select([distinct(table1.c.myid)]), "SELECT DISTINCT mytable.myid FROM mytable" ) - self.runtest( + self.assert_compile( select([table1.c.myid]).distinct(), "SELECT DISTINCT mytable.myid FROM mytable" ) - self.runtest( + self.assert_compile( select([func.count(table1.c.myid.distinct())]), "SELECT count(DISTINCT mytable.myid) FROM mytable" ) - self.runtest( + self.assert_compile( select([func.count(distinct(table1.c.myid))]), "SELECT count(DISTINCT mytable.myid) FROM mytable" ) @@ -364,7 +352,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A (literal(6), table1.c.myid, ':literal %s mytable.myid'), (literal(7), literal(5.5), ':literal %s :literal_1'), ): - self.runtest(py_op(lhs, rhs), res % sql_op) + self.assert_compile(py_op(lhs, rhs), res % sql_op) # exercise comparison operators for (py_op, fwd_op, rev_op) in ((operator.lt, '<', '>'), @@ -394,101 +382,95 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A "\n'" + compiled + "'\n does not match\n'" + fwd_sql + "'\n or\n'" + rev_sql + "'") - self.runtest( + self.assert_compile( table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND mytable.name != :mytable_name" ) - self.runtest( + self.assert_compile( table1.select((table1.c.myid != 12) & ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT (mytable.name = :mytable_name AND mytable.name = :mytable_name_1 AND mytable.name = :mytable_name_2)" ) - self.runtest( + self.assert_compile( table1.select((table1.c.myid != 12) & ~table1.c.name), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT mytable.name" ) - self.runtest( + self.assert_compile( literal("a") + literal("b") * literal("c"), ":literal || :literal_1 * :literal_2" ) # test the op() function, also that its results are further usable in expressions - self.runtest( + self.assert_compile( table1.select(table1.c.myid.op('hoho')(12)==14), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid) = :literal" ) def testunicodestartswith(self): string = u"hi \xf6 \xf5" - self.runtest( + self.assert_compile( table1.select(table1.c.name.startswith(string)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name LIKE :mytable_name", checkparams = {'mytable_name': u'hi \xf6 \xf5%'}, ) def testmultiparam(self): - self.runtest( + self.assert_compile( select(["*"], or_(table1.c.myid == 12, table1.c.myid=='asdf', table1.c.myid == 'foo')), "SELECT * FROM mytable WHERE mytable.myid = :mytable_myid OR mytable.myid = :mytable_myid_1 OR mytable.myid = :mytable_myid_2" ) def testorderby(self): - self.runtest( + self.assert_compile( table2.select(order_by = [table2.c.otherid, asc(table2.c.othername)]), "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername ASC" ) - self.runtest( + self.assert_compile( table2.select(order_by = [table2.c.otherid, table2.c.othername.desc()]), "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC" ) def testgroupby(self): - self.runtest( + self.assert_compile( select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername]), "SELECT myothertable.othername, count(myothertable.otherid) FROM myothertable GROUP BY myothertable.othername" ) - def testoraclelimit(self): - metadata = MetaData() - users = Table('users', metadata, Column('name', String(10), key='username')) - s = select([users.c.username], limit=5) - self.runtest(s, "SELECT name FROM (SELECT users.name AS name, ROW_NUMBER() OVER (ORDER BY users.rowid) AS ora_rn FROM users) WHERE ora_rn<=5", dialect=oracle.dialect()) - self.runtest(s, "SELECT name FROM (SELECT users.name AS name, ROW_NUMBER() OVER (ORDER BY users.rowid) AS ora_rn FROM users) WHERE ora_rn<=5", dialect=oracle.dialect()) def testgroupby_and_orderby(self): - self.runtest( + self.assert_compile( select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername], order_by = [table2.c.othername]), "SELECT myothertable.othername, count(myothertable.otherid) FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername" ) def testforupdate(self): - self.runtest(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE") + self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE") - self.runtest(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE") + self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE") - self.runtest(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE NOWAIT", dialect=oracle.dialect()) + self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE NOWAIT", dialect=oracle.dialect()) - self.runtest(table1.select(table1.c.myid==7, for_update="read"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE", dialect=mysql.dialect()) + self.assert_compile(table1.select(table1.c.myid==7, for_update="read"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE", dialect=mysql.dialect()) - self.runtest(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s FOR UPDATE", dialect=mysql.dialect()) + self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s FOR UPDATE", dialect=mysql.dialect()) - self.runtest(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE", dialect=oracle.dialect()) + self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE", dialect=oracle.dialect()) def testalias(self): # test the alias for a table1. column names stay the same, table name "changes" to "foo". - self.runtest( + self.assert_compile( select([table1.alias('foo')]) ,"SELECT foo.myid, foo.name, foo.description FROM mytable AS foo") for dialect in (firebird.dialect(), oracle.dialect()): - self.runtest( + self.assert_compile( select([table1.alias('foo')]) ,"SELECT foo.myid, foo.name, foo.description FROM mytable foo" ,dialect=dialect) - self.runtest( + self.assert_compile( select([table1.alias()]) ,"SELECT mytable_1.myid, mytable_1.name, mytable_1.description FROM mytable AS mytable_1") @@ -502,7 +484,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A # select from that alias, also using labels. two levels of labels should produce two underscores. # also, reference the column "mytable_myid" off of the t2view alias. - self.runtest( + self.assert_compile( a.select(a.c.mytable_myid == 9, use_labels = True), "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name AS t2view_mytable_name, \ t2view.mytable_description AS t2view_mytable_description, t2view.myothertable_otherid AS t2view_myothertable_otherid FROM \ @@ -513,18 +495,18 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = def test_prefixes(self): - self.runtest(table1.select().prefix_with("SQL_CALC_FOUND_ROWS").prefix_with("SQL_SOME_WEIRD_MYSQL_THING"), + self.assert_compile(table1.select().prefix_with("SQL_CALC_FOUND_ROWS").prefix_with("SQL_SOME_WEIRD_MYSQL_THING"), "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING mytable.myid, mytable.name, mytable.description FROM mytable" ) def testtext(self): - self.runtest( + self.assert_compile( text("select * from foo where lala = bar") , "select * from foo where lala = bar" ) # test bytestring - self.runtest(select( + self.assert_compile(select( ["foobar(a)", "pk_foo_bar(syslaal)"], "a = 12", from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"] @@ -532,7 +514,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12") # test unicode - self.runtest(select( + self.assert_compile(select( [u"foobar(a)", u"pk_foo_bar(syslaal)"], u"a = 12", from_obj = [u"foobar left outer join lala on foobar.foo = lala.foo"] @@ -547,22 +529,22 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = s.append_whereclause("column2=19") s = s.order_by("column1") s.append_from("table1") - self.runtest(s, "SELECT column1, column2 FROM table1 WHERE column1=12 AND column2=19 ORDER BY column1") + self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE column1=12 AND column2=19 ORDER BY column1") def testtextcolumns(self): - self.runtest( + self.assert_compile( select(["column1", "column2"], from_obj=[table1]).alias('somealias').select(), "SELECT somealias.column1, somealias.column2 FROM (SELECT column1, column2 FROM mytable) AS somealias" ) # test that use_labels doesnt interfere with literal columns - self.runtest( + self.assert_compile( select(["column1", "column2", table1.c.myid], from_obj=[table1], use_labels=True), "SELECT column1, column2, mytable.myid AS mytable_myid FROM mytable" ) # test that use_labels doesnt interfere with literal columns that have textual labels - self.runtest( + self.assert_compile( select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1], use_labels=True), "SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS mytable_myid FROM mytable" ) @@ -572,20 +554,20 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = print "---------------------------------------------" # test that "auto-labeling of subquery columns" doesnt interfere with literal columns, # exported columns dont get quoted - self.runtest( + self.assert_compile( select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1]).select(), "SELECT column1 AS foobar, column2 AS hoho, myid FROM (SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)" ) def testtextbinds(self): - self.runtest( + self.assert_compile( text("select * from foo where lala=:bar and hoho=:whee"), "select * from foo where lala=:bar and hoho=:whee", checkparams={'bar':4, 'whee': 7}, params={'bar':4, 'whee': 7, 'hoho':10}, ) - self.runtest( + self.assert_compile( text("select * from foo where clock='05:06:07'"), "select * from foo where clock='05:06:07'", checkparams={}, @@ -593,14 +575,14 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = ) dialect = postgres.dialect() - self.runtest( + self.assert_compile( text("select * from foo where lala=:bar and hoho=:whee"), "select * from foo where lala=%(bar)s and hoho=%(whee)s", checkparams={'bar':4, 'whee': 7}, params={'bar':4, 'whee': 7, 'hoho':10}, dialect=dialect ) - self.runtest( + self.assert_compile( text("select * from foo where clock='05:06:07' and mork='\:mindy'"), "select * from foo where clock='05:06:07' and mork=':mindy'", checkparams={}, @@ -609,7 +591,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = ) dialect = sqlite.dialect() - self.runtest( + self.assert_compile( text("select * from foo where lala=:bar and hoho=:whee"), "select * from foo where lala=? and hoho=?", checkparams=[4, 7], @@ -618,7 +600,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = ) def testtextmix(self): - self.runtest(select( + self.assert_compile(select( [table1, table2.c.otherid, "sysdate()", "foo, bar, lala"], and_( "foo.id = foofoo(lala)", @@ -630,7 +612,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today AND mytable.myid = myothertable.otherid") def testtextualsubquery(self): - self.runtest(select( + self.assert_compile(select( [alias(table1, 't'), "foo.f"], "foo.f = t.id", from_obj = ["(select f from bar where lala=heyhey) foo"] @@ -638,7 +620,7 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, (select f from bar where lala=heyhey) foo WHERE foo.f = t.id") def testliteral(self): - self.runtest(select([literal("foo") + literal("bar")], from_obj=[table1]), + self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]), "SELECT :literal || :literal_1 FROM mytable") def testcalculatedcolumns(self): @@ -648,19 +630,19 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today column('val2', Float), ) - self.runtest( + self.assert_compile( select([value_tbl.c.id, (value_tbl.c.val2 - value_tbl.c.val1)/value_tbl.c.val1]), "SELECT values.id, (values.val2 - values.val1) / values.val1 FROM values" ) - self.runtest( + self.assert_compile( select([value_tbl.c.id], (value_tbl.c.val2 - value_tbl.c.val1)/value_tbl.c.val1 > 2.0), "SELECT values.id FROM values WHERE (values.val2 - values.val1) / values.val1 > :literal" ) - self.runtest( + self.assert_compile( select([value_tbl.c.id], value_tbl.c.val1 / (value_tbl.c.val2 - value_tbl.c.val1) /value_tbl.c.val1 > 2.0), "SELECT values.id FROM values WHERE values.val1 / (values.val2 - values.val1) / values.val1 > :literal" ) @@ -668,27 +650,27 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today def testfunction(self): """tests the generation of functions using the func keyword""" # test an expression with a function - self.runtest(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid, + self.assert_compile(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid, "lala(:lala, :lala_1, :literal, mytable.myid) * myothertable.otherid") # test it in a SELECT - self.runtest(select([func.count(table1.c.myid)]), + self.assert_compile(select([func.count(table1.c.myid)]), "SELECT count(mytable.myid) FROM mytable") # test a "dotted" function name - self.runtest(select([func.foo.bar.lala(table1.c.myid)]), + self.assert_compile(select([func.foo.bar.lala(table1.c.myid)]), "SELECT foo.bar.lala(mytable.myid) FROM mytable") # test the bind parameter name with a "dotted" function name is only the name # (limits the length of the bind param name) - self.runtest(select([func.foo.bar.lala(12)]), + self.assert_compile(select([func.foo.bar.lala(12)]), "SELECT foo.bar.lala(:lala)") # test a dotted func off the engine itself - self.runtest(func.lala.hoho(7), "lala.hoho(:hoho)") + self.assert_compile(func.lala.hoho(7), "lala.hoho(:hoho)") # test None becomes NULL - self.runtest(func.my_func(1,2,None,3), "my_func(:my_func, :my_func_1, NULL, :my_func_2)") + self.assert_compile(func.my_func(1,2,None,3), "my_func(:my_func, :my_func_1, NULL, :my_func_2)") # assert func raises AttributeError for __bases__ attribute, since its not a class # fixes pydoc @@ -700,38 +682,38 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today def testextract(self): """test the EXTRACT function""" - self.runtest(select([extract("month", table3.c.otherstuff)]), "SELECT extract(month FROM thirdtable.otherstuff) FROM thirdtable") + self.assert_compile(select([extract("month", table3.c.otherstuff)]), "SELECT extract(month FROM thirdtable.otherstuff) FROM thirdtable") - self.runtest(select([extract("day", func.to_date("03/20/2005", "MM/DD/YYYY"))]), "SELECT extract(day FROM to_date(:to_date, :to_date_1))") + self.assert_compile(select([extract("day", func.to_date("03/20/2005", "MM/DD/YYYY"))]), "SELECT extract(day FROM to_date(:to_date, :to_date_1))") def testjoin(self): - self.runtest( + self.assert_compile( join(table2, table1, table1.c.myid == table2.c.otherid).select(), "SELECT myothertable.otherid, myothertable.othername, mytable.myid, mytable.name, \ mytable.description FROM myothertable JOIN mytable ON mytable.myid = myothertable.otherid" ) - self.runtest( + self.assert_compile( select( [table1], from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid)] ), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid") - self.runtest( + self.assert_compile( select( [join(join(table1, table2, table1.c.myid == table2.c.otherid), table3, table1.c.myid == table3.c.userid) ]), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid JOIN thirdtable ON mytable.myid = thirdtable.userid" ) - self.runtest( + self.assert_compile( join(users, addresses, users.c.user_id==addresses.c.user_id).select(), "SELECT users.user_id, users.user_name, users.password, addresses.address_id, addresses.user_id, addresses.street, addresses.city, addresses.state, addresses.zip FROM users JOIN addresses ON users.user_id = addresses.user_id" ) def testmultijoin(self): - self.runtest( + self.assert_compile( select([table1, table2, table3], from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid).outerjoin(table3, table1.c.myid==table3.c.userid)] @@ -740,7 +722,7 @@ mytable.description FROM myothertable JOIN mytable ON mytable.myid = myothertabl ) ,"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid LEFT OUTER JOIN thirdtable ON mytable.myid = thirdtable.userid" ) - self.runtest( + self.assert_compile( select([table1, table2, table3], from_obj = [outerjoin(table1, join(table2, table3, table2.c.otherid == table3.c.userid), table1.c.myid==table2.c.otherid)] ) @@ -754,12 +736,12 @@ mytable.description FROM myothertable JOIN mytable ON mytable.myid = myothertabl order_by = [table1.c.myid], ) - self.runtest(x, "SELECT mytable.myid, mytable.name, mytable.description \ + self.assert_compile(x, "SELECT mytable.myid, mytable.name, mytable.description \ FROM mytable WHERE mytable.myid = :mytable_myid UNION \ SELECT mytable.myid, mytable.name, mytable.description \ FROM mytable WHERE mytable.myid = :mytable_myid_1 ORDER BY mytable.myid") - self.runtest( + self.assert_compile( union( select([table1]), select([table2]), @@ -777,7 +759,7 @@ FROM myothertable UNION SELECT thirdtable.userid, thirdtable.otherstuff FROM thi ) assert u.corresponding_column(table2.c.otherid) is u.c.otherid - self.runtest( + self.assert_compile( union( select([table1]), select([table2]), @@ -791,7 +773,7 @@ FROM myothertable ORDER BY myid \ LIMIT 5 OFFSET 10" ) - self.runtest( + self.assert_compile( union( select([table1.c.myid, table1.c.name, func.max(table1.c.description)], table1.c.name=='name2', group_by=[table1.c.myid, table1.c.name]), table1.select(table1.c.name=='name1') @@ -803,7 +785,7 @@ FROM mytable WHERE mytable.name = :mytable_name_1" ) def test_compound_select_grouping(self): - self.runtest( + self.assert_compile( union_all( select([table1.c.myid]), union( @@ -816,7 +798,7 @@ FROM mytable WHERE mytable.name = :mytable_name_1" SELECT thirdtable.userid FROM thirdtable)" ) # This doesn't need grouping, so don't group to not give sqlite unnecessarily hard time - self.runtest( + self.assert_compile( union( except_( select([table2.c.otherid]), @@ -830,11 +812,6 @@ UNION SELECT mytable.myid FROM mytable" ) def testouterjoin(self): - # test an outer join. the oracle module should take the ON clause of the join and - # move it up to the WHERE clause of its parent select, and append (+) to all right-hand-side columns - # within the original onclause, but leave right-hand-side columns unchanged outside of the onclause - # parameters. - query = select( [table1, table2], or_( @@ -845,7 +822,7 @@ UNION SELECT mytable.myid FROM mytable" ), from_obj = [ outerjoin(table1, table2, table1.c.myid == table2.c.otherid) ] ) - self.runtest(query, + self.assert_compile(query, "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \ FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid \ WHERE mytable.name = %(mytable_name)s OR mytable.myid = %(mytable_myid)s OR \ @@ -854,18 +831,6 @@ EXISTS (select yay from foo where boo = lar)", dialect=postgres.dialect() ) - - self.runtest(query, - "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \ -FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid(+) AND \ -(mytable.name = :mytable_name OR mytable.myid = :mytable_myid OR \ -myothertable.othername != :myothertable_othername OR EXISTS (select yay from foo where boo = lar))", - dialect=oracle.OracleDialect(use_ansi = False)) - - query = table1.outerjoin(table2, table1.c.myid==table2.c.otherid).outerjoin(table3, table3.c.userid==table2.c.otherid) - self.runtest(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid LEFT OUTER JOIN thirdtable ON thirdtable.userid = myothertable.otherid") - self.runtest(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable, myothertable, thirdtable WHERE mytable.myid = myothertable.otherid(+) AND thirdtable.userid(+) = myothertable.otherid", dialect=oracle.dialect(use_ansi=False)) - def testbindparam(self): for ( stmt, @@ -933,8 +898,8 @@ myothertable.othername != :myothertable_othername OR EXISTS (select yay from foo ), ]: - self.runtest(stmt, expected_named_stmt, params=expected_default_params_dict) - self.runtest(stmt, expected_positional_stmt, dialect=sqlite.dialect()) + self.assert_compile(stmt, expected_named_stmt, params=expected_default_params_dict) + self.assert_compile(stmt, expected_positional_stmt, dialect=sqlite.dialect()) nonpositional = stmt.compile() positional = stmt.compile(dialect=sqlite.dialect()) assert positional.get_params().get_raw_list() == expected_default_params_list @@ -975,68 +940,68 @@ myothertable.othername != :myothertable_othername OR EXISTS (select yay from foo t = table('foo', column('id')) s = select([t, literal('lala').label('hoho')]) - self.runtest(s, "SELECT foo.id, :literal AS hoho FROM foo") + self.assert_compile(s, "SELECT foo.id, :literal AS hoho FROM foo") assert [str(c) for c in s.c] == ["id", "hoho"] def testin(self): - self.runtest(select([table1], table1.c.myid.in_('a')), + self.assert_compile(select([table1], table1.c.myid.in_('a')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid") - self.runtest(select([table1], table1.c.myid.in_('a', 'b')), + self.assert_compile(select([table1], table1.c.myid.in_('a', 'b')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1)") - self.runtest(select([table1], table1.c.myid.in_(literal('a'))), + self.assert_compile(select([table1], table1.c.myid.in_(literal('a'))), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :literal") - self.runtest(select([table1], table1.c.myid.in_(literal('a'), 'b')), + self.assert_compile(select([table1], table1.c.myid.in_(literal('a'), 'b')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, :mytable_myid)") - self.runtest(select([table1], table1.c.myid.in_(literal('a'), literal('b'))), + self.assert_compile(select([table1], table1.c.myid.in_(literal('a'), literal('b'))), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, :literal_1)") - self.runtest(select([table1], table1.c.myid.in_('a', literal('b'))), + self.assert_compile(select([table1], table1.c.myid.in_('a', literal('b'))), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :literal)") - self.runtest(select([table1], table1.c.myid.in_(literal(1) + 'a')), + self.assert_compile(select([table1], table1.c.myid.in_(literal(1) + 'a')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :literal + :literal_1") - self.runtest(select([table1], table1.c.myid.in_(literal('a') +'a', 'b')), + self.assert_compile(select([table1], table1.c.myid.in_(literal('a') +'a', 'b')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal || :literal_1, :mytable_myid)") - self.runtest(select([table1], table1.c.myid.in_(literal('a') + literal('a'), literal('b'))), + self.assert_compile(select([table1], table1.c.myid.in_(literal('a') + literal('a'), literal('b'))), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal || :literal_1, :literal_2)") - self.runtest(select([table1], table1.c.myid.in_(1, literal(3) + 4)), + self.assert_compile(select([table1], table1.c.myid.in_(1, literal(3) + 4)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :literal + :literal_1)") - self.runtest(select([table1], table1.c.myid.in_(literal('a') < 'b')), + self.assert_compile(select([table1], table1.c.myid.in_(literal('a') < 'b')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (:literal < :literal_1)") - self.runtest(select([table1], table1.c.myid.in_(table1.c.myid)), + self.assert_compile(select([table1], table1.c.myid.in_(table1.c.myid)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = mytable.myid") - self.runtest(select([table1], table1.c.myid.in_('a', table1.c.myid)), + self.assert_compile(select([table1], table1.c.myid.in_('a', table1.c.myid)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, mytable.myid)") - self.runtest(select([table1], table1.c.myid.in_(literal('a'), table1.c.myid)), + self.assert_compile(select([table1], table1.c.myid.in_(literal('a'), table1.c.myid)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, mytable.myid)") - self.runtest(select([table1], table1.c.myid.in_(literal('a'), table1.c.myid +'a')), + self.assert_compile(select([table1], table1.c.myid.in_(literal('a'), table1.c.myid +'a')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, mytable.myid + :mytable_myid)") - self.runtest(select([table1], table1.c.myid.in_(literal(1), 'a' + table1.c.myid)), + self.assert_compile(select([table1], table1.c.myid.in_(literal(1), 'a' + table1.c.myid)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, :mytable_myid + mytable.myid)") - self.runtest(select([table1], table1.c.myid.in_(1, 2, 3)), + self.assert_compile(select([table1], table1.c.myid.in_(1, 2, 3)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1, :mytable_myid_2)") - self.runtest(select([table1], table1.c.myid.in_(select([table2.c.otherid]))), + self.assert_compile(select([table1], table1.c.myid.in_(select([table2.c.otherid]))), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (SELECT myothertable.otherid FROM myothertable)") - self.runtest(select([table1], ~table1.c.myid.in_(select([table2.c.otherid]))), + self.assert_compile(select([table1], ~table1.c.myid.in_(select([table2.c.otherid]))), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid NOT IN (SELECT myothertable.otherid FROM myothertable)") - self.runtest(select([table1], table1.c.myid.in_( + self.assert_compile(select([table1], table1.c.myid.in_( union( select([table1], table1.c.myid == 5), select([table1], table1.c.myid == 12), @@ -1047,7 +1012,7 @@ SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytabl UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1)") # test that putting a select in an IN clause does not blow away its ORDER BY clause - self.runtest( + self.assert_compile( select([table1, table2], table2.c.otherid.in_( select([table2.c.otherid], order_by=[table2.c.othername], limit=10, correlate=False) @@ -1058,7 +1023,7 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE ) # test empty in clause - self.runtest(select([table1], table1.c.myid.in_()), + self.assert_compile(select([table1], table1.c.myid.in_()), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)") @@ -1096,82 +1061,82 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE import datetime table = Table('dt', metadata, Column('date', Date)) - self.runtest(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :dt_date AND :dt_date_1", checkparams={'dt_date':datetime.date(2006,6,1), 'dt_date_1':datetime.date(2006,6,5)}) + self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :dt_date AND :dt_date_1", checkparams={'dt_date':datetime.date(2006,6,1), 'dt_date_1':datetime.date(2006,6,5)}) - self.runtest(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :literal AND :literal_1", checkparams={'literal':datetime.date(2006,6,1), 'literal_1':datetime.date(2006,6,5)}) + self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :literal AND :literal_1", checkparams={'literal':datetime.date(2006,6,1), 'literal_1':datetime.date(2006,6,5)}) def test_operator_precedence(self): table = Table('op', metadata, Column('field', Integer)) - self.runtest(table.select((table.c.field == 5) == None), + self.assert_compile(table.select((table.c.field == 5) == None), "SELECT op.field FROM op WHERE (op.field = :op_field) IS NULL") - self.runtest(table.select((table.c.field + 5) == table.c.field), + self.assert_compile(table.select((table.c.field + 5) == table.c.field), "SELECT op.field FROM op WHERE op.field + :op_field = op.field") - self.runtest(table.select((table.c.field + 5) * 6), + self.assert_compile(table.select((table.c.field + 5) * 6), "SELECT op.field FROM op WHERE (op.field + :op_field) * :literal") - self.runtest(table.select((table.c.field * 5) + 6), + self.assert_compile(table.select((table.c.field * 5) + 6), "SELECT op.field FROM op WHERE op.field * :op_field + :literal") - self.runtest(table.select(5 + table.c.field.in_(5,6)), + self.assert_compile(table.select(5 + table.c.field.in_(5,6)), "SELECT op.field FROM op WHERE :literal + (op.field IN (:op_field, :op_field_1))") - self.runtest(table.select((5 + table.c.field).in_(5,6)), + self.assert_compile(table.select((5 + table.c.field).in_(5,6)), "SELECT op.field FROM op WHERE :op_field + op.field IN (:literal, :literal_1)") - self.runtest(table.select(not_(and_(table.c.field == 5, table.c.field == 7))), + self.assert_compile(table.select(not_(and_(table.c.field == 5, table.c.field == 7))), "SELECT op.field FROM op WHERE NOT (op.field = :op_field AND op.field = :op_field_1)") - self.runtest(table.select(not_(table.c.field) == 5), + self.assert_compile(table.select(not_(table.c.field) == 5), "SELECT op.field FROM op WHERE (NOT op.field) = :literal") - self.runtest(table.select((table.c.field == table.c.field).between(False, True)), + self.assert_compile(table.select((table.c.field == table.c.field).between(False, True)), "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :literal AND :literal_1") - self.runtest(table.select(between((table.c.field == table.c.field), False, True)), + self.assert_compile(table.select(between((table.c.field == table.c.field), False, True)), "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :literal AND :literal_1") -class CRUDTest(SQLTest): +class CRUDTest(SQLCompileTest): def testinsert(self): # generic insert, will create bind params for all columns - self.runtest(insert(table1), "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)") + self.assert_compile(insert(table1), "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)") # insert with user-supplied bind params for specific columns, # cols provided literally - self.runtest( + self.assert_compile( insert(table1, {table1.c.myid : bindparam('userid'), table1.c.name : bindparam('username')}), "INSERT INTO mytable (myid, name) VALUES (:userid, :username)") # insert with user-supplied bind params for specific columns, cols # provided as strings - self.runtest( + self.assert_compile( insert(table1, dict(myid = 3, name = 'jack')), "INSERT INTO mytable (myid, name) VALUES (:myid, :name)" ) # test with a tuple of params instead of named - self.runtest( + self.assert_compile( insert(table1, (3, 'jack', 'mydescription')), "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)", checkparams = {'myid':3, 'name':'jack', 'description':'mydescription'} ) - self.runtest( + self.assert_compile( insert(table1, values={table1.c.myid : bindparam('userid')}).values({table1.c.name : bindparam('username')}), "INSERT INTO mytable (myid, name) VALUES (:userid, :username)" ) def testinsertexpression(self): - self.runtest(insert(table1), "INSERT INTO mytable (myid) VALUES (lala())", params=dict(myid=func.lala())) + self.assert_compile(insert(table1), "INSERT INTO mytable (myid) VALUES (lala())", params=dict(myid=func.lala())) def testupdate(self): - self.runtest(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {table1.c.name:'fred'}) - self.runtest(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {'name':'fred'}) - self.runtest(update(table1, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid") - self.runtest(update(table1, whereclause = table1.c.name == bindparam('crit'), values = {table1.c.name : 'hi'}), "UPDATE mytable SET name=:name WHERE mytable.name = :crit", params = {'crit' : 'notthere'}) - self.runtest(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid, description=:description WHERE mytable.myid = :mytable_myid", params = {'description':'test'}) - self.runtest(update(table1, table1.c.myid == 12, values = {table1.c.myid : 9}), "UPDATE mytable SET myid=:myid, description=:description WHERE mytable.myid = :mytable_myid", params = {'mytable_myid': 12, 'myid': 9, 'description': 'test'}) + self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {table1.c.name:'fred'}) + self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {'name':'fred'}) + self.assert_compile(update(table1, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid") + self.assert_compile(update(table1, whereclause = table1.c.name == bindparam('crit'), values = {table1.c.name : 'hi'}), "UPDATE mytable SET name=:name WHERE mytable.name = :crit", params = {'crit' : 'notthere'}) + self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid, description=:description WHERE mytable.myid = :mytable_myid", params = {'description':'test'}) + self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.myid : 9}), "UPDATE mytable SET myid=:myid, description=:description WHERE mytable.myid = :mytable_myid", params = {'mytable_myid': 12, 'myid': 9, 'description': 'test'}) s = table1.update(table1.c.myid == 12, values = {table1.c.name : 'lala'}) c = s.compile(parameters = {'mytable_id':9,'name':'h0h0'}) - self.runtest(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}).values({table1.c.name:table1.c.name + 'foo'}), "UPDATE mytable SET name=(mytable.name || :mytable_name), description=:description WHERE mytable.myid = :mytable_myid", params = {'description':'test'}) + self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}).values({table1.c.name:table1.c.name + 'foo'}), "UPDATE mytable SET name=(mytable.name || :mytable_name), description=:description WHERE mytable.myid = :mytable_myid", params = {'description':'test'}) self.assert_(str(s) == str(c)) def testupdateexpression(self): - self.runtest(update(table1, + self.assert_compile(update(table1, (table1.c.myid == func.hoho(4)) & (table1.c.name == literal('foo') + table1.c.name + literal('lala')), values = { @@ -1183,60 +1148,60 @@ class CRUDTest(SQLTest): def testcorrelatedupdate(self): # test against a straight text subquery u = update(table1, values = {table1.c.name : text("(select name from mytable where id=mytable.id)")}) - self.runtest(u, "UPDATE mytable SET name=(select name from mytable where id=mytable.id)") + self.assert_compile(u, "UPDATE mytable SET name=(select name from mytable where id=mytable.id)") mt = table1.alias() u = update(table1, values = {table1.c.name : select([mt.c.name], mt.c.myid==table1.c.myid)}) - self.runtest(u, "UPDATE mytable SET name=(SELECT mytable_1.name FROM mytable AS mytable_1 WHERE mytable_1.myid = mytable.myid)") + self.assert_compile(u, "UPDATE mytable SET name=(SELECT mytable_1.name FROM mytable AS mytable_1 WHERE mytable_1.myid = mytable.myid)") # test against a regular constructed subquery s = select([table2], table2.c.otherid == table1.c.myid) u = update(table1, table1.c.name == 'jack', values = {table1.c.name : s}) - self.runtest(u, "UPDATE mytable SET name=(SELECT myothertable.otherid, myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid) WHERE mytable.name = :mytable_name") + self.assert_compile(u, "UPDATE mytable SET name=(SELECT myothertable.otherid, myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid) WHERE mytable.name = :mytable_name") # test a non-correlated WHERE clause s = select([table2.c.othername], table2.c.otherid == 7) u = update(table1, table1.c.name==s) - self.runtest(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid)") + self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid)") # test one that is actually correlated... s = select([table2.c.othername], table2.c.otherid == table1.c.myid) u = table1.update(table1.c.name==s) - self.runtest(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)") + self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)") def testdelete(self): - self.runtest(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid") + self.assert_compile(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid") def testcorrelateddelete(self): # test a non-correlated WHERE clause s = select([table2.c.othername], table2.c.otherid == 7) u = delete(table1, table1.c.name==s) - self.runtest(u, "DELETE FROM mytable WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid)") + self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid)") # test one that is actually correlated... s = select([table2.c.othername], table2.c.otherid == table1.c.myid) u = table1.delete(table1.c.name==s) - self.runtest(u, "DELETE FROM mytable WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)") + self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)") -class SchemaTest(SQLTest): +class SchemaTest(SQLCompileTest): def testselect(self): # these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables - self.runtest(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable") - self.runtest(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable WHERE remotetable.datatype_id = :remotetable_datatype_id AND remotetable.value = :remotetable_value") + self.assert_compile(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable") + self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable WHERE remotetable.datatype_id = :remotetable_datatype_id AND remotetable.value = :remotetable_value") s = table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')) s.use_labels = True - self.runtest(s, "SELECT remotetable.rem_id AS remotetable_rem_id, remotetable.datatype_id AS remotetable_datatype_id, remotetable.value AS remotetable_value FROM remote_owner.remotetable WHERE remotetable.datatype_id = :remotetable_datatype_id AND remotetable.value = :remotetable_value") + self.assert_compile(s, "SELECT remotetable.rem_id AS remotetable_rem_id, remotetable.datatype_id AS remotetable_datatype_id, remotetable.value AS remotetable_value FROM remote_owner.remotetable WHERE remotetable.datatype_id = :remotetable_datatype_id AND remotetable.value = :remotetable_value") def testalias(self): a = alias(table4, 'remtable') - self.runtest(a.select(a.c.datatype_id==7), "SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM remote_owner.remotetable AS remtable WHERE remtable.datatype_id = :remtable_datatype_id") + self.assert_compile(a.select(a.c.datatype_id==7), "SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM remote_owner.remotetable AS remtable WHERE remtable.datatype_id = :remtable_datatype_id") def testupdate(self): - self.runtest(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id WHERE remotetable.value = :remotetable_value") + self.assert_compile(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id WHERE remotetable.value = :remotetable_value") def testinsert(self): - self.runtest(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES (:rem_id, :datatype_id, :value)") + self.assert_compile(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES (:rem_id, :datatype_id, :value)") if __name__ == "__main__": testbase.main() diff --git a/test/testlib/__init__.py b/test/testlib/__init__.py index 3967863b84..bfc2ba04b2 100644 --- a/test/testlib/__init__.py +++ b/test/testlib/__init__.py @@ -6,11 +6,11 @@ Load after sqlalchemy imports to use instrumented stand-ins like Table. import testlib.config from testlib.schema import Table, Column import testlib.testing as testing -from testlib.testing import PersistTest, AssertMixin, ORMTest +from testlib.testing import PersistTest, AssertMixin, ORMTest, SQLCompileTest import testlib.profiling import testlib.engines __all__ = ('testing', 'Table', 'Column', - 'PersistTest', 'AssertMixin', 'ORMTest') + 'PersistTest', 'AssertMixin', 'ORMTest', 'SQLCompileTest') diff --git a/test/testlib/testing.py b/test/testlib/testing.py index 15a9fd768d..6c1d7ad810 100644 --- a/test/testlib/testing.py +++ b/test/testlib/testing.py @@ -9,7 +9,7 @@ import testlib.config as config sql, MetaData, clear_mappers = None, None, None -__all__ = 'PersistTest', 'AssertMixin', 'ORMTest' +__all__ = ('PersistTest', 'AssertMixin', 'ORMTest', 'SQLCompileTest') _ops = { '<': operator.lt, '>': operator.gt, @@ -206,6 +206,25 @@ class PersistTest(unittest.TestCase): """overridden to not return docstrings""" return None +class SQLCompileTest(PersistTest): + def assert_compile(self, clause, result, params=None, checkparams=None, dialect=None): + if dialect is None: + dialect = getattr(self, '__dialect__', None) + + c = clause.compile(parameters=params, dialect=dialect) + + print "\nSQL String:\n" + str(c) + repr(c.get_params()) + + cc = re.sub(r'\n', '', str(c)) + + self.assert_(cc == result, "\n'" + cc + "'\n does not match \n'" + result + "'") + + if checkparams is not None: + if isinstance(checkparams, list): + self.assert_(c.get_params().get_raw_list() == checkparams, "params dont match ") + else: + self.assert_(c.get_params().get_original_dict() == checkparams, "params dont match" + repr(c.get_params())) + class AssertMixin(PersistTest): """given a list-based structure of keys/properties which represent information within an object structure, and a list of actual objects, asserts that the list of objects corresponds to the structure."""