From: Mike Bayer Date: Fri, 1 Jul 2005 05:38:16 +0000 (+0000) Subject: Initial revision X-Git-Tag: rel_0_1_0~918 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=dba8fa9145df723e5628bfa998903039db4010c1;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Initial revision --- diff --git a/test/select.py b/test/select.py new file mode 100644 index 0000000000..fe6a57f700 --- /dev/null +++ b/test/select.py @@ -0,0 +1,408 @@ + +import sqlalchemy.ansisql as ansisql +import sqlalchemy.databases.postgres as postgres +import sqlalchemy.databases.oracle as oracle + +db = ansisql.engine() + +from sqlalchemy.sql import * +from sqlalchemy.schema import * + +from testbase import PersistTest +import unittest + +class SelectTest(PersistTest): + + def setUp(self): + + self.table = Table('mytable', db, + Column('myid', 3, key = 'id'), + Column('name', 4, key = 'name'), + Column('description', 4, key = 'description'), + ) + + self.table2 = Table( + 'myothertable', db, + Column('otherid',3, key='id'), + Column('othername', 4, key='name'), + ) + + self.table3 = Table( + 'thirdtable', db, + Column('userid', 5, key='id'), + Column('otherstuff', 5), + ) + + + def testoperator(self): + return + table = Table( + 'mytable', + Column('myid',3, key='id'), + Column('name', 4) + ) + + print (table.c.id == 5) + + def testtext(self): + self.runtest( + textclause("select * from foo where lala = bar", {}) , + "select * from foo where lala = bar", + engine = db + ) + + def testtableselect(self): + self.runtest(self.table.select(), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable") + + self.runtest(select([self.table, self.table2]), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, \ +myothertable.othername FROM mytable, myothertable") + + def testsubquery(self): + + s = select([self.table], self.table.c.name == 'jack') + self.runtest( + select( + [s], + s.c.id == 7 + ) + , + "SELECT myid, name, description FROM (SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name = :mytable_name) WHERE myid = :myid") + + sq = Select([self.table]) + self.runtest( + sq.select(), + "SELECT myid, name, description FROM (SELECT mytable.myid, mytable.name, mytable.description FROM mytable)" + ) + + sq = subquery( + 'sq', + [self.table], + ) + + self.runtest( + sq.select(sq.c.id == 7), + "SELECT sq.myid, sq.name, sq.description FROM \ +(SELECT mytable.myid, mytable.name, mytable.description FROM mytable) sq WHERE sq.myid = :sq_myid" + ) + + sq = subquery( + 'sq', + [self.table, self.table2], + and_(self.table.c.id ==7, self.table2.c.id==self.table.c.id), + use_labels = True + ) + + sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, \ +mytable.description AS mytable_description, myothertable.otherid AS myothertable_otherid, \ +myothertable.othername AS myothertable_othername FROM mytable, myothertable \ +WHERE mytable.myid = :mytable_myid AND myothertable.otherid = mytable.myid" + + self.runtest(sq.select(), "SELECT sq.mytable_myid, sq.mytable_name, sq.mytable_description, sq.myothertable_otherid, \ +sq.myothertable_othername FROM (" + sqstring + ") sq") + + sq2 = subquery( + 'sq2', + [sq], + use_labels = True + ) + + self.runtest(sq2.select(), "SELECT sq2.sq_mytable_myid, sq2.sq_mytable_name, sq2.sq_mytable_description, \ +sq2.sq_myothertable_otherid, sq2.sq_myothertable_othername FROM \ +(SELECT sq.mytable_myid AS sq_mytable_myid, sq.mytable_name AS sq_mytable_name, \ +sq.mytable_description AS sq_mytable_description, sq.myothertable_otherid AS sq_myothertable_otherid, \ +sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") sq) sq2") + + + def testand(self): + self.runtest( + select(['*'], and_(self.table.c.id == 12, self.table.c.name=='asdf', self.table2.c.name == 'foo', "sysdate() = today()")), + "SELECT * FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name AND myothertable.othername = :myothertable_othername AND sysdate() = today()" + ) + + def testor(self): + self.runtest( + select([self.table], and_( + self.table.c.id == 12, + or_(self.table2.c.name=='asdf', self.table2.c.name == 'foo', self.table2.c.id == 9), + "sysdate() = today()", + )), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND (myothertable.othername = :myothertable_othername OR myothertable.othername = :myothertable_othername_1 OR myothertable.otherid = :myothertable_otherid) AND sysdate() = today()" + ) + + + def testmultiparam(self): + self.runtest( + select(["*"], or_(self.table.c.id == 12, self.table.c.id=='asdf', self.table.c.id == 'foo')), + "SELECT * FROM mytable WHERE mytable.myid = :mytable_myid OR mytable.myid = :mytable_myid_1 OR mytable.myid = :mytable_myid_2" + ) + + def testorderby(self): + self.runtest( + self.table2.select(order_by = [self.table2.c.id, asc(self.table2.c.name)]), + "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername ASC" + ) + def testalias(self): + # test the alias for a table. column names stay the same, table name "changes" to "foo". + self.runtest( + select([alias(self.table, 'foo')]) + ,"SELECT foo.myid, foo.name, foo.description FROM mytable foo") + + # create a select for a join of two tables. use_labels means the column names will have + # labels tablename_columnname, which become the column keys accessible off the Selectable object. + # also, only use one column from the second table and all columns from the first table. + q = select([self.table, self.table2.c.id], self.table.c.id == self.table2.c.id, use_labels = True) + + # make an alias of the "selectable". column names stay the same (i.e. the labels), table name "changes" to "t2view". + a = alias(q, 't2view') + + # select from that alias, also using labels. two levels of labels should produce two underscores. + # also, reference the column "mytable_myid" off of the t2view alias. + self.runtest( + a.select(a.c.mytable_myid == 9, use_labels = True), + "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name AS t2view_mytable_name, \ +t2view.mytable_description AS t2view_mytable_description, t2view.myothertable_otherid AS t2view_myothertable_otherid FROM \ +(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, mytable.description AS mytable_description, \ +myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \ +WHERE mytable.myid = myothertable.otherid) t2view WHERE t2view.mytable_myid = :t2view_mytable_myid" + ) + + def testliteral(self): + self.runtest(select( + ["foobar(a)", "pk_foo_bar(syslaal)"], + "a = 12", + from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"], + engine = db + ), + "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12") + + def testliteralmix(self): + self.runtest(select( + [self.table, self.table2.c.id, "sysdate()", "foo, bar, lala"], + and_( + "foo.id = foofoo(lala)", + "datetime(foo) = Today", + self.table.c.id == self.table2.c.id, + ) + ), + "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, sysdate(), foo, bar, lala \ +FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today AND mytable.myid = myothertable.otherid") + + def testliteralsubquery(self): + self.runtest(select( + [alias(self.table, 't'), "foo.f"], + "foo.f = t.id", + from_obj = ["(select f from bar where lala=heyhey) foo"] + ), + "SELECT t.myid, t.name, t.description, foo.f FROM mytable t, (select f from bar where lala=heyhey) foo WHERE foo.f = t.id") + + def testjoin(self): + self.runtest( + join(self.table2, self.table, self.table.c.id == self.table2.c.id).select(), + "SELECT myothertable.otherid, myothertable.othername, mytable.myid, mytable.name, mytable.description \ +FROM myothertable, mytable WHERE mytable.myid = myothertable.otherid" + ) + + self.runtest( + select( + [self.table], + from_obj = [join(self.table, self.table2, self.table.c.id == self.table2.c.id)] + ), + "SELECT mytable.myid, mytable.name, mytable.description FROM (mytable JOIN myothertable ON mytable.myid = myothertable.otherid)") + + self.runtest( + select( + [join(self.table, join(self.table2, self.table3, self.table2.c.id == self.table3.c.id), self.table.c.id == self.table2.c.id), + + ]), + "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, \ +myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM \ +(mytable JOIN (myothertable JOIN thirdtable ON myothertable.otherid = thirdtable.userid) ON mytable.myid = myothertable.otherid)" + ) + + + def testunion(self): + x = union( + select([self.table], self.table.c.id == 5), + select([self.table], self.table.c.id == 12), + order_by = [self.table.c.id], + ) + + self.runtest(x, "SELECT mytable.myid, mytable.name, mytable.description \ +FROM mytable WHERE mytable.myid = :mytable_myid UNION \ +SELECT mytable.myid, mytable.name, mytable.description \ +FROM mytable WHERE mytable.myid = :mytable_myid_1 ORDER BY mytable.myid") + + self.runtest( + union( + select([self.table]), + select([self.table2]), + select([self.table3]) + ) + , + "SELECT mytable.myid, mytable.name, mytable.description \ +FROM mytable UNION SELECT myothertable.otherid, myothertable.othername \ +FROM myothertable UNION SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable") + + + def testouterjoin(self): + # test an outer join. the oracle module should take the ON clause of the join and + # move it up to the WHERE clause of its parent select, and append (+) to all right-hand-side columns + # within the original onclause, but leave right-hand-side columns unchanged outside of the onclause + # parameters. + + query = select( + [self.table, self.table2], + and_( + self.table.c.name == 'fred', + self.table.c.id == 10, + self.table2.c.name != 'jack', + "EXISTS (select yay from foo where boo = lar)" + ), + from_obj = [ outerjoin(self.table, self.table2, self.table.c.id == self.table2.c.id) ] + ) + + self.runtest(query, + "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \ +FROM (mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid) \ +WHERE mytable.name = :mytable_name AND mytable.myid = :mytable_myid AND \ +myothertable.othername != :myothertable_othername AND \ +EXISTS (select yay from foo where boo = lar)", + engine = postgres.engine()) + + + self.runtest(query, + "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \ +FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid(+) AND \ +mytable.name = :mytable_name AND mytable.myid = :mytable_myid AND \ +myothertable.othername != :myothertable_othername AND EXISTS (select yay from foo where boo = lar)", + engine = oracle.engine(use_ansi = False)) + + + + def testbindparam(self): + #return + self.runtest(select( + [self.table, self.table2], + and_(self.table.c.id == self.table2.c.id, + self.table.c.name == bindparam('mytablename'), + ) + ), + "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \ +FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid AND mytable.name = :mytablename" + ) + + + def testinsert(self): + # generic insert, will create bind params for all columns + self.runtest(insert(self.table), "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)") + + # insert with user-supplied bind params for specific columns, + # cols provided literally + self.runtest( + insert(self.table, {self.table.c.id : bindparam('userid'), self.table.c.name : bindparam('username')}), + "INSERT INTO mytable (myid, name) VALUES (:userid, :username)") + + # insert with user-supplied bind params for specific columns, cols + # provided as strings + self.runtest( + insert(self.table, dict(id = 3, name = 'jack')), + "" + ) + + # insert with a subselect provided + self.runtest( + insert(self.table, select([self.table2])), + "" + ) + + def footestupdate(self): + self.runtest( + update(self.table, {self.table.c.id : select([self.table2.c.id], self.table2.c.name == 'jack')}) + ) + + def footestonetomany(self): + return + + table1 = Table('users', + Column('user_id',3, key='id'), + Column('user_name', 4, key='name'), + Column('user_email', 4, key='desc'), + ) + + table2 = Table('orders', + Column('order_id',3, key='id'), + Column('user_id', 4), + Column('information', 4), + ) + + Relation(table1, table2, table2.c.user_id == table1.c.id, lazy = False) + + self.runtest( + table1.select(includerelations = True, use_labels = True), + "SELECT users.user_id, users.user_name, users.user_email, orders.order_id, orders.user_id, orders.information FROM users LEFT OUTER JOIN orders ON orders.user_id = users.user_id" + ) + + def footestmanytomany(self): + return + + table2 = Table('clubs', + Column('club_id',3, key='id'), + Column('club_name', 4, key='name'), + Column('club_description', 4, key='description'), + ) + + table1 = Table('users', + Column('user_id',3, key='id'), + Column('user_name', 4, key='name'), + Column('user_email', 4, key='desc'), + ) + + table3 = Table('user_clubs', + Column('user_id', 3), + Column('club_id', 4) + ) + + sq = join(table2, table3, table2.c.id==table3.c.user_id) + + Relation(table1, table2, table1.c.id==table3.c.user_id, + association = Relation(table3, table2, table3.c.club_id==table2.c.id), + lazy = False) + + self.runtest( + table1.select(includerelations = True), + "" + ) + + + def donttesttableselect(self): + + print select( + [table2, sq], + sq.c.name == table2.c.name + ) + + + + + print table.select(table.c.id == 'hithere').dump() + + print table.select(and_(table2.c.id == table.c.id, table2.c.id == 'hilar')).dump() + + print table.select(and_(table.c.id < 5, table.c.name == 'hilar')).dump() + + + + print repr(table.impl.params) + + print table.select(None).dump() + + u2 = alias(table, 'u2') + + + def runtest(self, clause, result, engine = None): + c = clause.compile(engine) + print "\n" + str(c) + repr(c.get_params()) + self.assert_(str(c) == result) + +if __name__ == "__main__": + unittest.main()