From: Mike Bayer Date: Fri, 1 Feb 2008 01:16:18 +0000 (+0000) Subject: - some consolidation of tests in select.py, moved X-Git-Tag: rel_0_4_3~54 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=a0ffeb546468e14fa9d99b30571f7b8f9b32f421;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - some consolidation of tests in select.py, moved other tests to more specific modules - added "now()" as a generic function; on SQLite and Oracle compiles as "CURRENT_TIMESTAMP"; "now()" on all others [ticket:943] --- diff --git a/CHANGES b/CHANGES index 6070b97d8b..b1c1070bd0 100644 --- a/CHANGES +++ b/CHANGES @@ -9,6 +9,10 @@ CHANGES to ILIKE on postgres, lower(x) LIKE lower(y) on all others. [ticket:727] + - added "now()" as a generic function; on SQLite and + Oracle compiles as "CURRENT_TIMESTAMP"; "now()" + on all others [ticket:943] + - the startswith(), endswith(), and contains() operators now concatenate the wildcard operator with the given operand in SQL, i.e. "'%' || " in all cases, diff --git a/lib/sqlalchemy/databases/oracle.py b/lib/sqlalchemy/databases/oracle.py index 55bdf74b1f..23c159ef7d 100644 --- a/lib/sqlalchemy/databases/oracle.py +++ b/lib/sqlalchemy/databases/oracle.py @@ -10,7 +10,7 @@ import datetime, random, re from sqlalchemy import util, sql, schema, exceptions, logging from sqlalchemy.engine import default, base from sqlalchemy.sql import compiler, visitors -from sqlalchemy.sql import operators as sql_operators +from sqlalchemy.sql import operators as sql_operators, functions as sql_functions from sqlalchemy import types as sqltypes @@ -586,6 +586,13 @@ class OracleCompiler(compiler.DefaultCompiler): } ) + functions = compiler.DefaultCompiler.functions.copy() + functions.update ( + { + sql_functions.now : 'CURRENT_TIMESTAMP' + } + ) + def __init__(self, *args, **kwargs): super(OracleCompiler, self).__init__(*args, **kwargs) self.__wheres = {} diff --git a/lib/sqlalchemy/databases/sqlite.py b/lib/sqlalchemy/databases/sqlite.py index 4ce8987272..a5a5a2ed90 100644 --- a/lib/sqlalchemy/databases/sqlite.py +++ b/lib/sqlalchemy/databases/sqlite.py @@ -11,7 +11,7 @@ from sqlalchemy import schema, exceptions, pool, PassiveDefault from sqlalchemy.engine import default import sqlalchemy.types as sqltypes import sqlalchemy.util as util -from sqlalchemy.sql import compiler +from sqlalchemy.sql import compiler, functions as sql_functions SELECT_REGEXP = re.compile(r'\s*(?:SELECT|PRAGMA)', re.I | re.UNICODE) @@ -349,6 +349,13 @@ class SQLiteDialect(default.DefaultDialect): class SQLiteCompiler(compiler.DefaultCompiler): + functions = compiler.DefaultCompiler.functions.copy() + functions.update ( + { + sql_functions.now : 'CURRENT_TIMESTAMP' + } + ) + def visit_cast(self, cast, **kwargs): if self.dialect.supports_cast: return super(SQLiteCompiler, self).visit_cast(cast) diff --git a/lib/sqlalchemy/sql/functions.py b/lib/sqlalchemy/sql/functions.py index 869df46a7e..be1d8eb611 100644 --- a/lib/sqlalchemy/sql/functions.py +++ b/lib/sqlalchemy/sql/functions.py @@ -35,6 +35,9 @@ class coalesce(GenericFunction): kwargs.setdefault('type_', _type_from_args(args)) GenericFunction.__init__(self, args=args, **kwargs) +class now(GenericFunction): + __return_type__ = sqltypes.DateTime + class concat(GenericFunction): __return_type__ = sqltypes.String def __init__(self, *args, **kwargs): diff --git a/test/dialect/mssql.py b/test/dialect/mssql.py index 0b7a8444b7..4fb918c7f3 100755 --- a/test/dialect/mssql.py +++ b/test/dialect/mssql.py @@ -21,6 +21,39 @@ class CompileTest(SQLCompileTest): t = table('sometable', column('somecolumn')) self.assert_compile(t.count(), "SELECT count(sometable.somecolumn) AS tbl_row_count FROM sometable") + def test_noorderby_insubquery(self): + """test that the ms-sql dialect removes ORDER BY clauses from subqueries""" + + table1 = table('mytable', + column('myid', Integer), + column('name', String), + column('description', String), + ) + + q = select([table1.c.myid], order_by=[table1.c.myid]).alias('foo') + crit = q.c.myid == table1.c.myid + self.assert_compile(select(['*'], crit), """SELECT * FROM (SELECT mytable.myid AS myid FROM mytable) AS foo, mytable WHERE foo.myid = mytable.myid""") + + def test_aliases_schemas(self): + metadata = MetaData() + table1 = table('mytable', + column('myid', Integer), + column('name', String), + column('description', String), + ) + + table4 = Table( + 'remotetable', metadata, + Column('rem_id', Integer, primary_key=True), + Column('datatype_id', Integer), + Column('value', String(20)), + schema = 'remote_owner' + ) + + self.assert_compile(table4.select(), "SELECT remotetable_1.rem_id, remotetable_1.datatype_id, remotetable_1.value FROM remote_owner.remotetable AS remotetable_1") + + self.assert_compile(table1.join(table4, table1.c.myid==table4.c.rem_id).select(), "SELECT mytable.myid, mytable.name, mytable.description, remotetable_1.rem_id, remotetable_1.datatype_id, remotetable_1.value FROM mytable JOIN remote_owner.remotetable AS remotetable_1 ON remotetable_1.rem_id = mytable.myid") + def test_union(self): t1 = table('t1', column('col1'), diff --git a/test/sql/functions.py b/test/sql/functions.py index 08b4615b9e..fe4bbd7acc 100644 --- a/test/sql/functions.py +++ b/test/sql/functions.py @@ -1,7 +1,8 @@ import testenv; testenv.configure_for_tests() import datetime from sqlalchemy import * -from sqlalchemy import databases, exceptions, sql +from sqlalchemy.sql import table, column +from sqlalchemy import databases, exceptions, sql, util from sqlalchemy.sql.compiler import BIND_TEMPLATES from sqlalchemy.engine import default from sqlalchemy import types as sqltypes @@ -32,6 +33,17 @@ class CompileTest(SQLCompileTest): self.assert_compile(func.nosuchfunction(), "nosuchfunction()", dialect=dialect) self.assert_compile(func.char_length('foo'), "char_length(%s)" % bindtemplate % {'name':'param_1', 'position':1}, dialect=dialect) + def test_generic_now(self): + assert isinstance(func.now().type, sqltypes.DateTime) + + for ret, dialect in [ + ('CURRENT_TIMESTAMP', sqlite.dialect()), + ('now()', postgres.dialect()), + ('now()', mysql.dialect()), + ('CURRENT_TIMESTAMP', oracle.dialect()) + ]: + self.assert_compile(func.now(), ret, dialect=dialect) + def test_constructor(self): try: func.current_timestamp('somearg') @@ -58,6 +70,75 @@ class CompileTest(SQLCompileTest): assert isinstance(func.concat("foo", "bar").type, sqltypes.String) + def test_assorted(self): + table1 = table('mytable', + column('myid', Integer), + ) + + table2 = table( + 'myothertable', + column('otherid', Integer), + ) + + # test an expression with a function + self.assert_compile(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid, + "lala(:lala_1, :lala_2, :param_1, mytable.myid) * myothertable.otherid") + + # test it in a SELECT + self.assert_compile(select([func.count(table1.c.myid)]), + "SELECT count(mytable.myid) AS count_1 FROM mytable") + + # test a "dotted" function name + self.assert_compile(select([func.foo.bar.lala(table1.c.myid)]), + "SELECT foo.bar.lala(mytable.myid) AS lala_1 FROM mytable") + + # test the bind parameter name with a "dotted" function name is only the name + # (limits the length of the bind param name) + self.assert_compile(select([func.foo.bar.lala(12)]), + "SELECT foo.bar.lala(:lala_2) AS lala_1") + + # test a dotted func off the engine itself + self.assert_compile(func.lala.hoho(7), "lala.hoho(:hoho_1)") + + # test None becomes NULL + self.assert_compile(func.my_func(1,2,None,3), "my_func(:my_func_1, :my_func_2, NULL, :my_func_3)") + + # test pickling + self.assert_compile(util.pickle.loads(util.pickle.dumps(func.my_func(1, 2, None, 3))), "my_func(:my_func_1, :my_func_2, NULL, :my_func_3)") + + # assert func raises AttributeError for __bases__ attribute, since its not a class + # fixes pydoc + try: + func.__bases__ + assert False + except AttributeError: + assert True + + def test_functions_with_cols(self): + users = table('users', column('id'), column('name'), column('fullname')) + calculate = select([column('q'), column('z'), column('r')], + from_obj=[func.calculate(bindparam('x'), bindparam('y'))]) + + self.assert_compile(select([users], users.c.id > calculate.c.z), + "SELECT users.id, users.name, users.fullname " + "FROM users, (SELECT q, z, r " + "FROM calculate(:x, :y)) " + "WHERE users.id > z" + ) + + s = select([users], users.c.id.between( + calculate.alias('c1').unique_params(x=17, y=45).c.z, + calculate.alias('c2').unique_params(x=5, y=12).c.z)) + + self.assert_compile(s, + "SELECT users.id, users.name, users.fullname " + "FROM users, (SELECT q, z, r " + "FROM calculate(:x_1, :y_1)) AS c1, (SELECT q, z, r " + "FROM calculate(:x_2, :y_2)) AS c2 " + "WHERE users.id BETWEEN c1.z AND c2.z" + , checkparams={'y_1': 45, 'x_1': 17, 'y_2': 12, 'x_2': 5}) + + class ExecuteTest(PersistTest): def test_standalone_execute(self): diff --git a/test/sql/select.py b/test/sql/select.py index 3585a2cd7c..9d561eb2d4 100644 --- a/test/sql/select.py +++ b/test/sql/select.py @@ -6,11 +6,6 @@ from sqlalchemy.sql import table, column from sqlalchemy.databases import sqlite, postgres, mysql, oracle, firebird, mssql from testlib import * - -# the select test now tests almost completely with TableClause/ColumnClause objects, -# which are free-roaming table/column objects not attached to any database. -# so SQLAlchemy's SQL construction engine can be used with no database dependencies at all. - table1 = table('mytable', column('myid', Integer), column('name', String), @@ -68,15 +63,16 @@ class SelectTest(SQLCompileTest): assert not hasattr(table1.alias().c.myid, 'columns') assert not hasattr(table1.alias().c.myid, 'c') - def testtableselect(self): + def test_table_select(self): self.assert_compile(table1.select(), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable") self.assert_compile(select([table1, table2]), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, \ myothertable.othername FROM mytable, myothertable") - def testselectselect(self): + def test_from_subquery(self): """tests placing select statements in the column clause of another select, for the purposes of selecting from the exported columns of that select.""" + s = select([table1], table1.c.name == 'jack') self.assert_compile( select( @@ -143,55 +139,37 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A "anon_2.description AS anon_2_description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description "\ "AS description FROM mytable) AS anon_2) AS anon_1") - def testmssql_noorderbyinsubquery(self): - """test that the ms-sql dialect removes ORDER BY clauses from subqueries""" - dialect = mssql.dialect() - q = select([table1.c.myid], order_by=[table1.c.myid]).alias('foo') - crit = q.c.myid == table1.c.myid - self.assert_compile(select(['*'], crit), """SELECT * FROM (SELECT mytable.myid AS myid FROM mytable ORDER BY mytable.myid) AS foo, mytable WHERE foo.myid = mytable.myid""", dialect=sqlite.dialect()) - self.assert_compile(select(['*'], crit), """SELECT * FROM (SELECT mytable.myid AS myid FROM mytable) AS foo, mytable WHERE foo.myid = mytable.myid""", dialect=mssql.dialect()) - - def testmssql_aliases_schemas(self): - self.assert_compile(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable") - - dialect = mssql.dialect() - self.assert_compile(table4.select(), "SELECT remotetable_1.rem_id, remotetable_1.datatype_id, remotetable_1.value FROM remote_owner.remotetable AS remotetable_1", dialect=dialect) - - # TODO: this is probably incorrect; no "AS " is being applied to the table - self.assert_compile(table1.join(table4, table1.c.myid==table4.c.rem_id).select(), "SELECT mytable.myid, mytable.name, mytable.description, remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM mytable JOIN remote_owner.remotetable ON remotetable.rem_id = mytable.myid") - - def testdontovercorrelate(self): + def test_dont_overcorrelate(self): self.assert_compile(select([table1], from_obj=[table1, table1.select()]), """SELECT mytable.myid, mytable.name, mytable.description FROM mytable, (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)""") - def testexistsascolumnclause(self): + def test_exists(self): self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid_1)", params={'mytable_myid':5}) self.assert_compile(select([table1, exists([1], from_obj=table2)]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={}) self.assert_compile(select([table1, exists([1], from_obj=table2).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={}) - def test_generative_exists(self): - self.assert_compile( + self.assert_compile( table1.select(exists([1], table2.c.otherid == table1.c.myid).correlate(table1)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)" - ) + ) - self.assert_compile( + self.assert_compile( table1.select(exists([1]).where(table2.c.otherid == table1.c.myid).correlate(table1)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)" - ) + ) - self.assert_compile( + self.assert_compile( table1.select(exists([1]).where(table2.c.otherid == table1.c.myid).correlate(table1)).replace_selectable(table2, table2.alias()), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)" - ) + ) - self.assert_compile( + self.assert_compile( table1.select(exists([1]).where(table2.c.otherid == table1.c.myid).correlate(table1)).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).replace_selectable(table2, table2.alias()), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable JOIN myothertable AS myothertable_1 ON mytable.myid = myothertable_1.otherid WHERE EXISTS (SELECT 1 FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)" - ) + ) - def testwheresubquery(self): + def test_where_subquery(self): s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s') self.assert_compile( select([users, s.c.street], from_obj=s), @@ -233,7 +211,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A ) - def testorderbysubquery(self): + def test_orderby_subquery(self): self.assert_compile( table1.select(order_by=[select([table2.c.otherid], table1.c.myid==table2.c.otherid)]), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable ORDER BY (SELECT myothertable.otherid FROM myothertable WHERE mytable.myid = myothertable.otherid)" @@ -325,31 +303,29 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A s2 = select([table1, s1], from_obj=j1) self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) AS anon_1 FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid") - def testlabelcomparison(self): + def test_label_comparison(self): x = func.lala(table1.c.myid).label('foo') self.assert_compile(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :param_1") - def testand(self): + def test_conjunctions(self): self.assert_compile( - select(['*'], and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()")), - "SELECT * FROM mytable, myothertable WHERE mytable.myid = :mytable_myid_1 AND mytable.name = :mytable_name_1 "\ + and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()"), + "mytable.myid = :mytable_myid_1 AND mytable.name = :mytable_name_1 "\ "AND myothertable.othername = :myothertable_othername_1 AND sysdate() = today()" ) - def testor(self): self.assert_compile( - select([table1], and_( + and_( table1.c.myid == 12, or_(table2.c.othername=='asdf', table2.c.othername == 'foo', table2.c.otherid == 9), "sysdate() = today()", - )), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable "\ - "WHERE mytable.myid = :mytable_myid_1 AND (myothertable.othername = :myothertable_othername_1 OR "\ + ), + "mytable.myid = :mytable_myid_1 AND (myothertable.othername = :myothertable_othername_1 OR "\ "myothertable.othername = :myothertable_othername_2 OR myothertable.otherid = :myothertable_otherid_1) AND sysdate() = today()", checkparams = {'myothertable_othername_1': 'asdf', 'myothertable_othername_2':'foo', 'myothertable_otherid_1': 9, 'mytable_myid_1': 12} ) - def testdistinct(self): + def test_distinct(self): self.assert_compile( select([table1.c.myid.distinct()]), "SELECT DISTINCT mytable.myid FROM mytable" ) @@ -370,8 +346,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A select([func.count(distinct(table1.c.myid))]), "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable" ) - def testoperators(self): - # exercise arithmetic operators + def test_operators(self): for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'), (operator.sub, '-'), (operator.div, '/'), ): @@ -454,6 +429,15 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A clause = (table1.c.myid == 12) & table1.c.myid.between(15, 20) & table1.c.myid.like('hoho') assert str(clause) == str(util.pickle.loads(util.pickle.dumps(clause))) + # ILIKE + stmt = table1.select(table1.c.name.ilike('%something%')) + self.assert_compile(stmt, "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE lower(mytable.name) LIKE lower(:mytable_name_1)") + self.assert_compile(stmt, "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name ILIKE %(mytable_name_1)s", dialect=postgres.PGDialect()) + + stmt = table1.select(~table1.c.name.ilike('%something%')) + self.assert_compile(stmt, "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE lower(mytable.name) NOT LIKE lower(:mytable_name_1)") + self.assert_compile(stmt, "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name NOT ILIKE %(mytable_name_1)s", dialect=postgres.PGDialect()) + def test_composed_string_comparators(self): self.assert_compile( table1.c.name.contains('jo'), "mytable.name LIKE '%' || :mytable_name_1 || '%'" , checkparams = {'mytable_name_1': u'jo'}, @@ -486,7 +470,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A "SELECT * FROM mytable WHERE mytable.myid = :mytable_myid_1 OR mytable.myid = :mytable_myid_2 OR mytable.myid = :mytable_myid_3" ) - def testorderby(self): + def test_orderby_groupby(self): self.assert_compile( table2.select(order_by = [table2.c.otherid, asc(table2.c.othername)]), "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername ASC" @@ -508,7 +492,6 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A "SELECT myothertable.otherid, myothertable.othername FROM myothertable" ) - def testgroupby(self): self.assert_compile( select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername]), "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername" @@ -525,23 +508,12 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable" ) - - def testgroupby_and_orderby(self): self.assert_compile( select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername], order_by = [table2.c.othername]), "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername" ) - def testilike(self): - stmt = table1.select(table1.c.name.ilike('%something%')) - self.assert_compile(stmt, "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE lower(mytable.name) LIKE lower(:mytable_name_1)") - self.assert_compile(stmt, "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name ILIKE %(mytable_name_1)s", dialect=postgres.PGDialect()) - - stmt = table1.select(~table1.c.name.ilike('%something%')) - self.assert_compile(stmt, "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE lower(mytable.name) NOT LIKE lower(:mytable_name_1)") - self.assert_compile(stmt, "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name NOT ILIKE %(mytable_name_1)s", dialect=postgres.PGDialect()) - - def testforupdate(self): + def test_for_update(self): self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE") self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE") @@ -554,7 +526,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE", dialect=oracle.dialect()) - def testalias(self): + def test_alias(self): # test the alias for a table1. column names stay the same, table name "changes" to "foo". self.assert_compile( select([table1.alias('foo')]) @@ -595,7 +567,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING mytable.myid, mytable.name, mytable.description FROM mytable" ) - def testtext(self): + def test_text(self): self.assert_compile( text("select * from foo where lala = bar") , "select * from foo where lala = bar" @@ -627,7 +599,6 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = s.append_from("table1") self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE column1=12 AND column2=19 ORDER BY column1") - def testtextcolumns(self): self.assert_compile( select(["column1", "column2"], from_obj=table1).alias('somealias').select(), "SELECT somealias.column1, somealias.column2 FROM (SELECT column1, column2 FROM mytable) AS somealias" @@ -655,7 +626,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = "SELECT column1 AS foobar, column2 AS hoho, myid FROM (SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)" ) - def testtextbinds(self): + def test_binds_in_text(self): self.assert_compile( text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), "select * from foo where lala=:bar and hoho=:whee", @@ -692,7 +663,6 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = dialect=dialect ) - def testtextmix(self): self.assert_compile(select( [table1, table2.c.otherid, "sysdate()", "foo, bar, lala"], and_( @@ -704,7 +674,6 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, sysdate(), foo, bar, lala \ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today AND mytable.myid = myothertable.otherid") - def testtextualsubquery(self): self.assert_compile(select( [alias(table1, 't'), "foo.f"], "foo.f = t.id", @@ -712,11 +681,11 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today ), "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, (select f from bar where lala=heyhey) foo WHERE foo.f = t.id") - def testliteral(self): + def test_literal(self): self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]), "SELECT :param_1 || :param_2 AS anon_1 FROM mytable") - def testcalculatedcolumns(self): + def test_calculated_columns(self): value_tbl = table('values', column('id', Integer), column('val1', Float), @@ -740,75 +709,13 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today "SELECT values.id FROM values WHERE values.val1 / (values.val2 - values.val1) / values.val1 > :param_1" ) - def testfunction(self): - """tests the generation of functions using the func keyword""" - # test an expression with a function - self.assert_compile(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid, - "lala(:lala_1, :lala_2, :param_1, mytable.myid) * myothertable.otherid") - - # test it in a SELECT - self.assert_compile(select([func.count(table1.c.myid)]), - "SELECT count(mytable.myid) AS count_1 FROM mytable") - - # test a "dotted" function name - self.assert_compile(select([func.foo.bar.lala(table1.c.myid)]), - "SELECT foo.bar.lala(mytable.myid) AS lala_1 FROM mytable") - - # test the bind parameter name with a "dotted" function name is only the name - # (limits the length of the bind param name) - self.assert_compile(select([func.foo.bar.lala(12)]), - "SELECT foo.bar.lala(:lala_2) AS lala_1") - - # test a dotted func off the engine itself - self.assert_compile(func.lala.hoho(7), "lala.hoho(:hoho_1)") - - # test None becomes NULL - self.assert_compile(func.my_func(1,2,None,3), "my_func(:my_func_1, :my_func_2, NULL, :my_func_3)") - - # test pickling - self.assert_compile(util.pickle.loads(util.pickle.dumps(func.my_func(1, 2, None, 3))), "my_func(:my_func_1, :my_func_2, NULL, :my_func_3)") - - # assert func raises AttributeError for __bases__ attribute, since its not a class - # fixes pydoc - try: - func.__bases__ - assert False - except AttributeError: - assert True - - def test_functions_with_cols(self): - from sqlalchemy.sql import column - users = table('users', column('id'), column('name'), column('fullname')) - calculate = select([column('q'), column('z'), column('r')], - from_obj=[func.calculate(bindparam('x'), bindparam('y'))]) - - self.assert_compile(select([users], users.c.id > calculate.c.z), - "SELECT users.id, users.name, users.fullname " - "FROM users, (SELECT q, z, r " - "FROM calculate(:x, :y)) " - "WHERE users.id > z" - ) - - print "--------------------------------------------------" - s = select([users], users.c.id.between( - calculate.alias('c1').unique_params(x=17, y=45).c.z, - calculate.alias('c2').unique_params(x=5, y=12).c.z)) - - self.assert_compile(s, - "SELECT users.id, users.name, users.fullname " - "FROM users, (SELECT q, z, r " - "FROM calculate(:x_1, :y_1)) AS c1, (SELECT q, z, r " - "FROM calculate(:x_2, :y_2)) AS c2 " - "WHERE users.id BETWEEN c1.z AND c2.z" - , checkparams={'y_1': 45, 'x_1': 17, 'y_2': 12, 'x_2': 5}) - - def testextract(self): + def test_extract(self): """test the EXTRACT function""" self.assert_compile(select([extract("month", table3.c.otherstuff)]), "SELECT extract(month FROM thirdtable.otherstuff) AS extract_1 FROM thirdtable") self.assert_compile(select([extract("day", func.to_date("03/20/2005", "MM/DD/YYYY"))]), "SELECT extract(day FROM to_date(:to_date_1, :to_date_2)) AS extract_1") - def testjoin(self): + def test_joins(self): self.assert_compile( join(table2, table1, table1.c.myid == table2.c.otherid).select(), "SELECT myothertable.otherid, myothertable.othername, mytable.myid, mytable.name, \ @@ -834,7 +741,6 @@ mytable.description FROM myothertable JOIN mytable ON mytable.myid = myothertabl "SELECT users.user_id, users.user_name, users.password, addresses.address_id, addresses.user_id, addresses.street, addresses.city, addresses.state, addresses.zip FROM users JOIN addresses ON users.user_id = addresses.user_id" ) - def testmultijoin(self): self.assert_compile( select([table1, table2, table3], @@ -851,7 +757,25 @@ mytable.description FROM myothertable JOIN mytable ON mytable.myid = myothertabl ,"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable LEFT OUTER JOIN (myothertable JOIN thirdtable ON myothertable.otherid = thirdtable.userid) ON mytable.myid = myothertable.otherid" ) - def testunion(self): + query = select( + [table1, table2], + or_( + table1.c.name == 'fred', + table1.c.myid == 10, + table2.c.othername != 'jack', + "EXISTS (select yay from foo where boo = lar)" + ), + from_obj = [ outerjoin(table1, table2, table1.c.myid == table2.c.otherid) ] + ) + self.assert_compile(query, + "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \ +FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid \ +WHERE mytable.name = :mytable_name_1 OR mytable.myid = :mytable_myid_1 OR \ +myothertable.othername != :myothertable_othername_1 OR \ +EXISTS (select yay from foo where boo = lar)", + ) + + def test_compound_selects(self): try: union(table3.select(), table1.select()) except exceptions.ArgumentError, err: @@ -913,8 +837,6 @@ FROM mytable WHERE mytable.name = :mytable_name_2" "SELECT :param_1 AS value UNION SELECT :param_2 AS value" ) - - def test_compound_select_grouping(self): self.assert_compile( union_all( select([table1.c.myid]), @@ -941,28 +863,8 @@ SELECT thirdtable.userid FROM thirdtable)" UNION SELECT mytable.myid FROM mytable" ) - def testouterjoin(self): - query = select( - [table1, table2], - or_( - table1.c.name == 'fred', - table1.c.myid == 10, - table2.c.othername != 'jack', - "EXISTS (select yay from foo where boo = lar)" - ), - from_obj = [ outerjoin(table1, table2, table1.c.myid == table2.c.otherid) ] - ) - self.assert_compile(query, - "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \ -FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid \ -WHERE mytable.name = %(mytable_name_1)s OR mytable.myid = %(mytable_myid_1)s OR \ -myothertable.othername != %(myothertable_othername_1)s OR \ -EXISTS (select yay from foo where boo = lar)", - dialect=postgres.dialect() - ) - @testing.uses_deprecated('//get_params') - def testbindparam(self): + def test_binds(self): for ( stmt, expected_named_stmt, @@ -1073,7 +975,6 @@ EXISTS (select yay from foo where boo = lar)", except exceptions.CompileError, err: assert str(err) == "Bind parameter 'mytable_myid_1' conflicts with unique bind parameter of the same name" - def test_bind_as_col(self): t = table('foo', column('id')) @@ -1298,7 +1199,7 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE self.assert_compile(s1, "SELECT %s FROM (SELECT %s FROM mytable)" % (expr,expr)) class CRUDTest(SQLCompileTest): - def testinsert(self): + def test_insert(self): # generic insert, will create bind params for all columns self.assert_compile(insert(table1), "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)") @@ -1327,7 +1228,9 @@ class CRUDTest(SQLCompileTest): "INSERT INTO mytable (myid, name) VALUES (:userid, :username)" ) - def testinlineinsert(self): + self.assert_compile(insert(table1, values=dict(myid=func.lala())), "INSERT INTO mytable (myid) VALUES (lala())") + + def test_inline_insert(self): metadata = MetaData() table = Table('sometable', metadata, Column('id', Integer, primary_key=True), @@ -1335,10 +1238,7 @@ class CRUDTest(SQLCompileTest): self.assert_compile(table.insert(values={}, inline=True), "INSERT INTO sometable (foo) VALUES (foobar())") self.assert_compile(table.insert(inline=True), "INSERT INTO sometable (foo) VALUES (foobar())", params={}) - def testinsertexpression(self): - self.assert_compile(insert(table1, values=dict(myid=func.lala())), "INSERT INTO mytable (myid) VALUES (lala())") - - def testupdate(self): + def test_update(self): self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid_1", params = {table1.c.name:'fred'}) self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid_1", params = {'name':'fred'}) self.assert_compile(update(table1, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid") @@ -1351,7 +1251,6 @@ class CRUDTest(SQLCompileTest): self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}).values({table1.c.name:table1.c.name + 'foo'}), "UPDATE mytable SET name=(mytable.name || :mytable_name_1), description=:description WHERE mytable.myid = :mytable_myid_1", params = {'description':'test'}) self.assert_(str(s) == str(c)) - def testupdateexpression(self): self.assert_compile(update(table1, (table1.c.myid == func.hoho(4)) & (table1.c.name == literal('foo') + table1.c.name + literal('lala')), @@ -1361,7 +1260,7 @@ class CRUDTest(SQLCompileTest): }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), name=(mytable.name || :mytable_name_1) " "WHERE mytable.myid = hoho(:hoho_1) AND mytable.name = :param_2 || mytable.name || :param_3") - def testcorrelatedupdate(self): + def test_correlated_update(self): # test against a straight text subquery u = update(table1, values = {table1.c.name : text("(select name from mytable where id=mytable.id)")}) self.assert_compile(u, "UPDATE mytable SET name=(select name from mytable where id=mytable.id)") @@ -1387,10 +1286,10 @@ class CRUDTest(SQLCompileTest): self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = "\ "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)") - def testdelete(self): + def test_delete(self): self.assert_compile(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid_1") - def testcorrelateddelete(self): + def test_correlated_delete(self): # test a non-correlated WHERE clause s = select([table2.c.othername], table2.c.otherid == 7) u = delete(table1, table1.c.name==s) @@ -1429,7 +1328,8 @@ class InlineDefaultTest(SQLCompileTest): self.assert_compile(t.update(inline=True, values={'col3':'foo'}), "UPDATE test SET col1=foo(:foo_1), col2=(SELECT coalesce(max(foo.id)) AS coalesce_1 FROM foo), col3=:col3") class SchemaTest(SQLCompileTest): - def testselect(self): + @testing.fails_on('mssql') + def test_select(self): # these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables self.assert_compile(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable") self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')), @@ -1442,16 +1342,16 @@ class SchemaTest(SQLCompileTest): "AS remotetable_value FROM remote_owner.remotetable WHERE "\ "remotetable.datatype_id = :remotetable_datatype_id_1 AND remotetable.value = :remotetable_value_1") - def testalias(self): + def test_alias(self): a = alias(table4, 'remtable') self.assert_compile(a.select(a.c.datatype_id==7), "SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM remote_owner.remotetable AS remtable "\ "WHERE remtable.datatype_id = :remtable_datatype_id_1") - def testupdate(self): + def test_update(self): self.assert_compile(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id "\ "WHERE remotetable.value = :remotetable_value_1") - def testinsert(self): + def test_insert(self): self.assert_compile(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES "\ "(:rem_id, :datatype_id, :value)")