]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Initial revision
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 1 Jul 2005 05:38:16 +0000 (05:38 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 1 Jul 2005 05:38:16 +0000 (05:38 +0000)
test/select.py [new file with mode: 0644]

diff --git a/test/select.py b/test/select.py
new file mode 100644 (file)
index 0000000..fe6a57f
--- /dev/null
@@ -0,0 +1,408 @@
+
+import sqlalchemy.ansisql as ansisql
+import sqlalchemy.databases.postgres as postgres
+import sqlalchemy.databases.oracle as oracle
+
+db = ansisql.engine()
+
+from sqlalchemy.sql import *
+from sqlalchemy.schema import *
+
+from testbase import PersistTest
+import unittest
+
+class SelectTest(PersistTest):
+    
+    def setUp(self):
+
+        self.table = Table('mytable', db,
+            Column('myid', 3, key = 'id'),
+            Column('name', 4, key = 'name'),
+            Column('description', 4, key = 'description'),
+        )
+
+        self.table2 = Table(
+            'myothertable', db,
+            Column('otherid',3, key='id'),
+            Column('othername', 4, key='name'),
+        )
+
+        self.table3 = Table(
+            'thirdtable', db,
+            Column('userid', 5, key='id'),
+            Column('otherstuff', 5),
+        )
+
+    
+    def testoperator(self):
+        return
+        table = Table(
+            'mytable',
+            Column('myid',3, key='id'),
+            Column('name', 4)
+        )
+
+        print (table.c.id == 5)
+
+    def testtext(self):
+        self.runtest(
+            textclause("select * from foo where lala = bar", {}) ,
+            "select * from foo where lala = bar",
+            engine = db
+        )
+    
+    def testtableselect(self):
+        self.runtest(self.table.select(), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable")
+
+        self.runtest(select([self.table, self.table2]), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, \
+myothertable.othername FROM mytable, myothertable")
+        
+    def testsubquery(self):
+    
+        s = select([self.table], self.table.c.name == 'jack')    
+        self.runtest(
+            select(
+                [s],
+                s.c.id == 7
+            )
+            ,
+        "SELECT myid, name, description FROM (SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name = :mytable_name) WHERE myid = :myid")
+        
+        sq = Select([self.table])
+        self.runtest(
+            sq.select(),
+            "SELECT myid, name, description FROM (SELECT mytable.myid, mytable.name, mytable.description FROM mytable)"
+        )
+        
+        sq = subquery(
+            'sq',
+            [self.table],
+        )
+
+        self.runtest(
+            sq.select(sq.c.id == 7), 
+            "SELECT sq.myid, sq.name, sq.description FROM \
+(SELECT mytable.myid, mytable.name, mytable.description FROM mytable) sq WHERE sq.myid = :sq_myid"
+        )
+        
+        sq = subquery(
+            'sq',
+            [self.table, self.table2],
+            and_(self.table.c.id ==7, self.table2.c.id==self.table.c.id),
+            use_labels = True
+        )
+        
+        sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, \
+mytable.description AS mytable_description, myothertable.otherid AS myothertable_otherid, \
+myothertable.othername AS myothertable_othername FROM mytable, myothertable \
+WHERE mytable.myid = :mytable_myid AND myothertable.otherid = mytable.myid"
+
+        self.runtest(sq.select(), "SELECT sq.mytable_myid, sq.mytable_name, sq.mytable_description, sq.myothertable_otherid, \
+sq.myothertable_othername FROM (" + sqstring + ") sq")
+
+        sq2 = subquery(
+            'sq2',
+            [sq],
+            use_labels = True
+        )
+
+        self.runtest(sq2.select(), "SELECT sq2.sq_mytable_myid, sq2.sq_mytable_name, sq2.sq_mytable_description, \
+sq2.sq_myothertable_otherid, sq2.sq_myothertable_othername FROM \
+(SELECT sq.mytable_myid AS sq_mytable_myid, sq.mytable_name AS sq_mytable_name, \
+sq.mytable_description AS sq_mytable_description, sq.myothertable_otherid AS sq_myothertable_otherid, \
+sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") sq) sq2")
+        
+        
+    def testand(self):
+        self.runtest(
+            select(['*'], and_(self.table.c.id == 12, self.table.c.name=='asdf', self.table2.c.name == 'foo', "sysdate() = today()")), 
+            "SELECT * FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name AND myothertable.othername = :myothertable_othername AND sysdate() = today()"
+        )
+
+    def testor(self):
+        self.runtest(
+            select([self.table], and_(
+                self.table.c.id == 12,
+                or_(self.table2.c.name=='asdf', self.table2.c.name == 'foo', self.table2.c.id == 9),
+                "sysdate() = today()", 
+            )),
+            "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND (myothertable.othername = :myothertable_othername OR myothertable.othername = :myothertable_othername_1 OR myothertable.otherid = :myothertable_otherid) AND sysdate() = today()"
+        )
+
+
+    def testmultiparam(self):
+        self.runtest(
+            select(["*"], or_(self.table.c.id == 12, self.table.c.id=='asdf', self.table.c.id == 'foo')), 
+            "SELECT * FROM mytable WHERE mytable.myid = :mytable_myid OR mytable.myid = :mytable_myid_1 OR mytable.myid = :mytable_myid_2"
+        )
+
+    def testorderby(self):
+        self.runtest(
+            self.table2.select(order_by = [self.table2.c.id, asc(self.table2.c.name)]),
+            "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername ASC"
+        )
+    def testalias(self):
+        # test the alias for a table.  column names stay the same, table name "changes" to "foo".
+        self.runtest(
+        select([alias(self.table, 'foo')])
+        ,"SELECT foo.myid, foo.name, foo.description FROM mytable foo")
+    
+        # create a select for a join of two tables.  use_labels means the column names will have
+        # labels tablename_columnname, which become the column keys accessible off the Selectable object.
+        # also, only use one column from the second table and all columns from the first table.
+        q = select([self.table, self.table2.c.id], self.table.c.id == self.table2.c.id, use_labels = True)
+        
+        # make an alias of the "selectable".  column names stay the same (i.e. the labels), table name "changes" to "t2view".
+        a = alias(q, 't2view')
+
+        # select from that alias, also using labels.  two levels of labels should produce two underscores.
+        # also, reference the column "mytable_myid" off of the t2view alias.
+        self.runtest(
+            a.select(a.c.mytable_myid == 9, use_labels = True),
+            "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name AS t2view_mytable_name, \
+t2view.mytable_description AS t2view_mytable_description, t2view.myothertable_otherid AS t2view_myothertable_otherid FROM \
+(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, mytable.description AS mytable_description, \
+myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \
+WHERE mytable.myid = myothertable.otherid) t2view WHERE t2view.mytable_myid = :t2view_mytable_myid"
+        )
+        
+    def testliteral(self):        
+        self.runtest(select(
+            ["foobar(a)", "pk_foo_bar(syslaal)"],
+            "a = 12",
+            from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"],
+            engine = db
+        ), 
+        "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12")
+
+    def testliteralmix(self):
+        self.runtest(select(
+            [self.table, self.table2.c.id, "sysdate()", "foo, bar, lala"],
+            and_(
+                "foo.id = foofoo(lala)",
+                "datetime(foo) = Today",
+                self.table.c.id == self.table2.c.id,
+            )
+        ), 
+        "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, sysdate(), foo, bar, lala \
+FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today AND mytable.myid = myothertable.otherid")
+
+    def testliteralsubquery(self):
+        self.runtest(select(
+            [alias(self.table, 't'), "foo.f"],
+            "foo.f = t.id",
+            from_obj = ["(select f from bar where lala=heyhey) foo"]
+        ), 
+        "SELECT t.myid, t.name, t.description, foo.f FROM mytable t, (select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
+
+    def  testjoin(self):
+        self.runtest(
+            join(self.table2, self.table, self.table.c.id == self.table2.c.id).select(),
+            "SELECT myothertable.otherid, myothertable.othername, mytable.myid, mytable.name, mytable.description \
+FROM myothertable, mytable WHERE mytable.myid = myothertable.otherid"
+        )
+        
+        self.runtest(
+            select(
+                [self.table],
+                from_obj = [join(self.table, self.table2, self.table.c.id == self.table2.c.id)]
+            ),
+        "SELECT mytable.myid, mytable.name, mytable.description FROM (mytable JOIN myothertable ON mytable.myid = myothertable.otherid)")
+        
+        self.runtest(
+            select(
+                [join(self.table, join(self.table2, self.table3, self.table2.c.id == self.table3.c.id), self.table.c.id == self.table2.c.id),
+                
+            ]),
+            "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, \
+myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM \
+(mytable JOIN (myothertable JOIN thirdtable ON myothertable.otherid = thirdtable.userid) ON mytable.myid = myothertable.otherid)"
+        )
+        
+        
+    def testunion(self):
+            x = union(
+                  select([self.table], self.table.c.id == 5),
+                  select([self.table], self.table.c.id == 12),
+                  order_by = [self.table.c.id],
+            )
+  
+            self.runtest(x, "SELECT mytable.myid, mytable.name, mytable.description \
+FROM mytable WHERE mytable.myid = :mytable_myid UNION \
+SELECT mytable.myid, mytable.name, mytable.description \
+FROM mytable WHERE mytable.myid = :mytable_myid_1 ORDER BY mytable.myid")
+  
+            self.runtest(
+                    union(
+                        select([self.table]),
+                        select([self.table2]),
+                        select([self.table3])
+                    )
+            ,
+            "SELECT mytable.myid, mytable.name, mytable.description \
+FROM mytable UNION SELECT myothertable.otherid, myothertable.othername \
+FROM myothertable UNION SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable")
+            
+            
+    def testouterjoin(self):
+        # test an outer join.  the oracle module should take the ON clause of the join and
+        # move it up to the WHERE clause of its parent select, and append (+) to all right-hand-side columns
+        # within the original onclause, but leave right-hand-side columns unchanged outside of the onclause
+        # parameters.
+        
+        query = select(
+                [self.table, self.table2],
+                and_(
+                    self.table.c.name == 'fred',
+                    self.table.c.id == 10,
+                    self.table2.c.name != 'jack',
+                    "EXISTS (select yay from foo where boo = lar)"
+                ),
+                from_obj = [ outerjoin(self.table, self.table2, self.table.c.id == self.table2.c.id) ]
+                )
+                
+        self.runtest(query, 
+            "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
+FROM (mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid) \
+WHERE mytable.name = :mytable_name AND mytable.myid = :mytable_myid AND \
+myothertable.othername != :myothertable_othername AND \
+EXISTS (select yay from foo where boo = lar)",
+            engine = postgres.engine())
+
+
+        self.runtest(query, 
+            "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
+FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid(+) AND \
+mytable.name = :mytable_name AND mytable.myid = :mytable_myid AND \
+myothertable.othername != :myothertable_othername AND EXISTS (select yay from foo where boo = lar)",
+            engine = oracle.engine(use_ansi = False))
+
+
+
+    def testbindparam(self):
+        #return
+        self.runtest(select(
+                    [self.table, self.table2],
+                    and_(self.table.c.id == self.table2.c.id,
+                    self.table.c.name == bindparam('mytablename'),
+                    )
+                ),
+                "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
+FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid AND mytable.name = :mytablename"
+                )
+
+
+    def testinsert(self):
+        # generic insert, will create bind params for all columns
+        self.runtest(insert(self.table), "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)")
+
+        # insert with user-supplied bind params for specific columns,
+        # cols provided literally
+        self.runtest(
+            insert(self.table, {self.table.c.id : bindparam('userid'), self.table.c.name : bindparam('username')}), 
+            "INSERT INTO mytable (myid, name) VALUES (:userid, :username)")
+        
+        # insert with user-supplied bind params for specific columns, cols
+        # provided as strings
+        self.runtest(
+            insert(self.table, dict(id = 3, name = 'jack')), 
+            ""
+        )
+        
+        # insert with a subselect provided 
+        self.runtest(
+            insert(self.table, select([self.table2])),
+            ""
+        )
+        
+    def footestupdate(self):
+        self.runtest(
+            update(self.table, {self.table.c.id : select([self.table2.c.id], self.table2.c.name == 'jack')})
+        )
+        
+    def footestonetomany(self):
+        return
+        
+        table1 = Table('users',
+            Column('user_id',3, key='id'),
+            Column('user_name', 4, key='name'),
+            Column('user_email', 4, key='desc'),
+        )
+
+        table2 = Table('orders',
+            Column('order_id',3, key='id'),
+            Column('user_id', 4),
+            Column('information', 4),
+        )
+        
+        Relation(table1, table2, table2.c.user_id == table1.c.id, lazy = False)
+
+        self.runtest(
+            table1.select(includerelations = True, use_labels = True),
+            "SELECT users.user_id, users.user_name, users.user_email, orders.order_id, orders.user_id, orders.information FROM users LEFT OUTER JOIN orders ON orders.user_id = users.user_id"
+        )
+
+    def footestmanytomany(self):
+        return
+
+        table2 = Table('clubs',
+            Column('club_id',3, key='id'),
+            Column('club_name', 4, key='name'),
+            Column('club_description', 4, key='description'),
+        )
+
+        table1 = Table('users',
+            Column('user_id',3, key='id'),
+            Column('user_name', 4, key='name'),
+            Column('user_email', 4, key='desc'),
+        )
+        
+        table3 = Table('user_clubs',
+            Column('user_id', 3),
+            Column('club_id', 4)
+        )
+        
+        sq = join(table2, table3, table2.c.id==table3.c.user_id)
+        
+        Relation(table1, table2, table1.c.id==table3.c.user_id, 
+            association = Relation(table3, table2, table3.c.club_id==table2.c.id), 
+        lazy = False)
+
+        self.runtest(
+            table1.select(includerelations = True),
+            ""
+        )
+
+
+    def donttesttableselect(self):
+        
+        print select(
+            [table2, sq],
+            sq.c.name == table2.c.name
+        )
+        
+
+        
+
+        print table.select(table.c.id == 'hithere').dump()
+
+        print table.select(and_(table2.c.id == table.c.id, table2.c.id == 'hilar')).dump()
+
+        print table.select(and_(table.c.id < 5, table.c.name == 'hilar')).dump()
+        
+
+
+        print repr(table.impl.params)
+        
+        print table.select(None).dump()
+        
+        u2 = alias(table, 'u2')
+        
+        
+    def runtest(self, clause, result, engine = None):
+        c = clause.compile(engine)
+        print "\n" + str(c) + repr(c.get_params())
+        self.assert_(str(c) == result)
+
+if __name__ == "__main__":
+    unittest.main()