]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
added more (failing) tests to query, will need to fix [ticket:932] [ticket:933]
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 14 Jan 2008 18:50:10 +0000 (18:50 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 14 Jan 2008 18:50:10 +0000 (18:50 +0000)
test/orm/inheritance/polymorph.py
test/orm/inheritance/query.py

index d2d83ea3c3c3a627854f7a2208ba6c602fc2dff8..5f0e74fa351fd0d875bf2b0dcaefb1f3cd3e880f 100644 (file)
@@ -342,7 +342,6 @@ def generate_round_trip_test(include_base=False, lazy_relation=True, redefine_co
         session.delete(c)
         session.flush()
 
-
     test_roundtrip.__name__ = "test_%s%s%s%s%s" % (
         (lazy_relation and "lazy" or "eager"),
         (include_base and "_inclbase" or ""),
index 5537f94a5d5722ce45a8b358478f313cb4bfbcca..2e3d392556b7d159a72a5eef864c657f2589bd88 100644 (file)
@@ -28,219 +28,251 @@ class Machine(fixtures.Base):
 class Paperwork(fixtures.Base):
     pass
 
-class PolymorphicQueryTest(ORMTest):
-    keep_data = True
-    keep_mappers = True
-
-    def define_tables(self, metadata):
-        global companies, people, engineers, managers, boss, paperwork, machines
-
-        companies = Table('companies', metadata,
-           Column('company_id', Integer, Sequence('company_id_seq', optional=True), primary_key=True),
-           Column('name', String(50)))
-
-        people = Table('people', metadata,
-           Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
-           Column('company_id', Integer, ForeignKey('companies.company_id')),
-           Column('name', String(50)),
-           Column('type', String(30)))
-
-        engineers = Table('engineers', metadata,
-           Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
-           Column('status', String(30)),
-           Column('engineer_name', String(50)),
-           Column('primary_language', String(50)),
-          )
+def make_test(select_type):
+    class PolymorphicQueryTest(ORMTest):
+        keep_data = True
+        keep_mappers = True
+
+        def define_tables(self, metadata):
+            global companies, people, engineers, managers, boss, paperwork, machines
+
+            companies = Table('companies', metadata,
+               Column('company_id', Integer, Sequence('company_id_seq', optional=True), primary_key=True),
+               Column('name', String(50)))
+
+            people = Table('people', metadata,
+               Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
+               Column('company_id', Integer, ForeignKey('companies.company_id')),
+               Column('name', String(50)),
+               Column('type', String(30)))
+
+            engineers = Table('engineers', metadata,
+               Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+               Column('status', String(30)),
+               Column('engineer_name', String(50)),
+               Column('primary_language', String(50)),
+              )
          
-        machines = Table('machines', metadata,
-            Column('machine_id', Integer, primary_key=True),
-            Column('name', String(50)),
-            Column('engineer_id', Integer, ForeignKey('engineers.person_id')))
+            machines = Table('machines', metadata,
+                Column('machine_id', Integer, primary_key=True),
+                Column('name', String(50)),
+                Column('engineer_id', Integer, ForeignKey('engineers.person_id')))
             
-        managers = Table('managers', metadata,
-           Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
-           Column('status', String(30)),
-           Column('manager_name', String(50))
-           )
-
-        boss = Table('boss', metadata,
-            Column('boss_id', Integer, ForeignKey('managers.person_id'), primary_key=True),
-            Column('golf_swing', String(30)),
-            )
-
-        paperwork = Table('paperwork', metadata,
-            Column('paperwork_id', Integer, primary_key=True),
-            Column('description', String(50)),
-            Column('person_id', Integer, ForeignKey('people.person_id')))
-
-        # create the most awkward polymorphic selects possible;
-        # the union does not include the "people" table by itself nor does it have
-        # "people.person_id" directly in it, and it also does not include at all
-        # the "boss" table
-        person_join = polymorphic_union(
-            {
-                'engineer':people.join(engineers),
-                'manager':people.join(managers),
-            }, None, 'pjoin')
-
-        # separate join for second-level inherit
-        manager_join = people.join(managers).outerjoin(boss)
-
-        mapper(Company, companies, properties={
-            'employees':relation(Person)
-        })
-        
-        mapper(Machine, machines)
-        
-        # testing a order_by here as well; the surrogate mapper has to adapt it
-        mapper(Person, people, select_table=person_join, polymorphic_on=people.c.type, polymorphic_identity='person', order_by=people.c.person_id, 
-            properties={
-                'paperwork':relation(Paperwork)
-            })
-        mapper(Engineer, engineers, inherits=Person, polymorphic_identity='engineer', properties={
-                'machines':relation(Machine)
+            managers = Table('managers', metadata,
+               Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+               Column('status', String(30)),
+               Column('manager_name', String(50))
+               )
+
+            boss = Table('boss', metadata,
+                Column('boss_id', Integer, ForeignKey('managers.person_id'), primary_key=True),
+                Column('golf_swing', String(30)),
+                )
+
+            paperwork = Table('paperwork', metadata,
+                Column('paperwork_id', Integer, primary_key=True),
+                Column('description', String(50)),
+                Column('person_id', Integer, ForeignKey('people.person_id')))
+
+            clear_mappers()
+            
+            mapper(Company, companies, properties={
+                'employees':relation(Person)
             })
-        mapper(Manager, managers, select_table=manager_join, inherits=Person, polymorphic_identity='manager')
-        mapper(Boss, boss, inherits=Manager, polymorphic_identity='boss')
-        mapper(Paperwork, paperwork)
-
-    def insert_data(self):
-        global all_employees, c1_employees, c2_employees, e1, e2, b1, m1, e3, c1, c2
-
-        c1 = Company(name="MegaCorp, Inc.")
-        c2 = Company(name="Elbonia, Inc.")
-        e1 = Engineer(name="dilbert", engineer_name="dilbert", primary_language="java", status="regular engineer", paperwork=[
-            Paperwork(description="tps report #1"),
-            Paperwork(description="tps report #2")
-        ], machines=[
-            Machine(name='IBM ThinkPad'),
-            Machine(name='IPhone'),
-        ])
-        e2 = Engineer(name="wally", engineer_name="wally", primary_language="c++", status="regular engineer", paperwork=[
-            Paperwork(description="tps report #3"),
-            Paperwork(description="tps report #4")
-        ], machines=[
-            Machine(name="Commodore 64")
-        ])
-        b1 = Boss(name="pointy haired boss", golf_swing="fore", manager_name="pointy", status="da boss", paperwork=[
-            Paperwork(description="review #1"),
-        ])
-        m1 = Manager(name="dogbert", manager_name="dogbert", status="regular manager", paperwork=[
-            Paperwork(description="review #2"),
-            Paperwork(description="review #3")
-        ])
-        c1.employees = [e1, e2, b1, m1]
-
-        e3 = Engineer(name="vlad", engineer_name="vlad", primary_language="cobol", status="elbonian engineer", paperwork=[
-            Paperwork(description='elbonian missive #3')
-        ], machines=[
-                Machine(name="Commodore 64"),
-                Machine(name="IBM 3270")
-        ])
+
+            mapper(Machine, machines)
+
+            if select_type == '':
+                person_join = manager_join = None
+            elif select_type == 'Unions':
+                person_join = polymorphic_union(
+                    {
+                        'engineer':people.join(engineers),
+                        'manager':people.join(managers),
+                    }, None, 'pjoin')
+
+                manager_join = people.join(managers).outerjoin(boss)
+            elif select_type == 'AliasedJoins':
+                person_join = people.outerjoin(engineers).outerjoin(managers).select(use_labels=True).alias('pjoin')
+                manager_join = people.join(managers).outerjoin(boss).select(use_labels=True).alias('mjoin')
+            elif select_type == 'Joins':
+                person_join = people.outerjoin(engineers).outerjoin(managers)
+                manager_join = people.join(managers).outerjoin(boss)
+
+
+            # testing a order_by here as well; the surrogate mapper has to adapt it
+            mapper(Person, people, 
+                select_table=person_join, 
+                polymorphic_on=people.c.type, polymorphic_identity='person', order_by=people.c.person_id, 
+                properties={
+                    'paperwork':relation(Paperwork)
+                })
+            mapper(Engineer, engineers, inherits=Person, polymorphic_identity='engineer', properties={
+                    'machines':relation(Machine)
+                })
+            mapper(Manager, managers, select_table=manager_join, inherits=Person, polymorphic_identity='manager')
+            mapper(Boss, boss, inherits=Manager, polymorphic_identity='boss')
+            mapper(Paperwork, paperwork)
+        
+
+        def insert_data(self):
+            global all_employees, c1_employees, c2_employees, e1, e2, b1, m1, e3, c1, c2
+
+            c1 = Company(name="MegaCorp, Inc.")
+            c2 = Company(name="Elbonia, Inc.")
+            e1 = Engineer(name="dilbert", engineer_name="dilbert", primary_language="java", status="regular engineer", paperwork=[
+                Paperwork(description="tps report #1"),
+                Paperwork(description="tps report #2")
+            ], machines=[
+                Machine(name='IBM ThinkPad'),
+                Machine(name='IPhone'),
+            ])
+            e2 = Engineer(name="wally", engineer_name="wally", primary_language="c++", status="regular engineer", paperwork=[
+                Paperwork(description="tps report #3"),
+                Paperwork(description="tps report #4")
+            ], machines=[
+                Machine(name="Commodore 64")
+            ])
+            b1 = Boss(name="pointy haired boss", golf_swing="fore", manager_name="pointy", status="da boss", paperwork=[
+                Paperwork(description="review #1"),
+            ])
+            m1 = Manager(name="dogbert", manager_name="dogbert", status="regular manager", paperwork=[
+                Paperwork(description="review #2"),
+                Paperwork(description="review #3")
+            ])
+            c1.employees = [e1, e2, b1, m1]
+
+            e3 = Engineer(name="vlad", engineer_name="vlad", primary_language="cobol", status="elbonian engineer", paperwork=[
+                Paperwork(description='elbonian missive #3')
+            ], machines=[
+                    Machine(name="Commodore 64"),
+                    Machine(name="IBM 3270")
+            ])
         
-        c2.employees = [e3]
-        sess = create_session()
-        sess.save(c1)
-        sess.save(c2)
-        sess.flush()
-        sess.clear()
+            c2.employees = [e3]
+            sess = create_session()
+            sess.save(c1)
+            sess.save(c2)
+            sess.flush()
+            sess.clear()
 
-        all_employees = [e1, e2, b1, m1, e3]
-        c1_employees = [e1, e2, b1, m1]
-        c2_employees = [e3]
+            all_employees = [e1, e2, b1, m1, e3]
+            c1_employees = [e1, e2, b1, m1]
+            c2_employees = [e3]
 
-    def test_filter_on_subclass(self):
-        sess = create_session()
-        self.assertEquals(sess.query(Engineer).all()[0], Engineer(name="dilbert"))
+        def test_filter_on_subclass(self):
+            sess = create_session()
+            self.assertEquals(sess.query(Engineer).all()[0], Engineer(name="dilbert"))
 
-        self.assertEquals(sess.query(Engineer).first(), Engineer(name="dilbert"))
+            self.assertEquals(sess.query(Engineer).first(), Engineer(name="dilbert"))
 
-        self.assertEquals(sess.query(Engineer).filter(Engineer.person_id==e1.person_id).first(), Engineer(name="dilbert"))
+            self.assertEquals(sess.query(Engineer).filter(Engineer.person_id==e1.person_id).first(), Engineer(name="dilbert"))
 
-        self.assertEquals(sess.query(Manager).filter(Manager.person_id==m1.person_id).one(), Manager(name="dogbert"))
+            self.assertEquals(sess.query(Manager).filter(Manager.person_id==m1.person_id).one(), Manager(name="dogbert"))
 
-        self.assertEquals(sess.query(Manager).filter(Manager.person_id==b1.person_id).one(), Boss(name="pointy haired boss"))
+            self.assertEquals(sess.query(Manager).filter(Manager.person_id==b1.person_id).one(), Boss(name="pointy haired boss"))
         
-        self.assertEquals(sess.query(Boss).filter(Boss.person_id==b1.person_id).one(), Boss(name="pointy haired boss"))
+            self.assertEquals(sess.query(Boss).filter(Boss.person_id==b1.person_id).one(), Boss(name="pointy haired boss"))
 
-    def test_join_from_polymorphic(self):
-        sess = create_session()
+        def test_join_from_polymorphic(self):
+            sess = create_session()
         
-        for aliased in (True, False):
-            self.assertEquals(sess.query(Person).join('paperwork', aliased=aliased).filter(Paperwork.description.like('%review%')).all(), [b1, m1])
+            for aliased in (True, False):
+                self.assertEquals(sess.query(Person).join('paperwork', aliased=aliased).filter(Paperwork.description.like('%review%')).all(), [b1, m1])
 
-            self.assertEquals(sess.query(Person).join('paperwork', aliased=aliased).filter(Paperwork.description.like('%#2%')).all(), [e1, m1])
+                self.assertEquals(sess.query(Person).join('paperwork', aliased=aliased).filter(Paperwork.description.like('%#2%')).all(), [e1, m1])
 
-            self.assertEquals(sess.query(Engineer).join('paperwork', aliased=aliased).filter(Paperwork.description.like('%#2%')).all(), [e1])
+                self.assertEquals(sess.query(Engineer).join('paperwork', aliased=aliased).filter(Paperwork.description.like('%#2%')).all(), [e1])
 
-            self.assertEquals(sess.query(Person).join('paperwork', aliased=aliased).filter(Person.c.name.like('%dog%')).filter(Paperwork.description.like('%#2%')).all(), [m1])
+                self.assertEquals(sess.query(Person).join('paperwork', aliased=aliased).filter(Person.c.name.like('%dog%')).filter(Paperwork.description.like('%#2%')).all(), [m1])
     
-    def test_join_to_polymorphic(self):
-        sess = create_session()
-        self.assertEquals(sess.query(Company).join('employees').filter(Person.name=='vlad').one(), c2)
-
-        self.assertEquals(sess.query(Company).join('employees', aliased=True).filter(Person.name=='vlad').one(), c2)
+        def test_join_to_polymorphic(self):
+            if select_type == 'Joins':
+                return
+                
+            sess = create_session()
+            self.assertEquals(sess.query(Company).join('employees').filter(Person.name=='vlad').one(), c2)
+
+            self.assertEquals(sess.query(Company).join('employees', aliased=True).filter(Person.name=='vlad').one(), c2)
     
-    def test_join_to_subclass(self):
-        sess = create_session()
+        def test_join_to_subclass(self):
+            if select_type in ('Joins', ''):
+                return
+
+            sess = create_session()
 
-        self.assertEquals(sess.query(Person).join(Engineer.machines).all(), [e1, e2, e3])
+            self.assertEquals(sess.query(Company).select_from(companies.join(people).join(engineers)).filter(Engineer.primary_language=='java').all(), [c1])
+        
+            self.assertEquals(sess.query(Company).join(['employees']).filter(Engineer.primary_language=='java').all(), [c1])
+        
+            self.assertEquals(sess.query(Person).join(Engineer.machines).all(), [e1, e2, e3])
 
-        self.assertEquals(sess.query(Person).join(Engineer.machines).filter(Machine.name.ilike("%ibm%")).all(), [e1, e3])
+            self.assertEquals(sess.query(Person).join(Engineer.machines).filter(Machine.name.ilike("%ibm%")).all(), [e1, e3])
         
-        self.assertEquals(sess.query(Company).join(['employees', Engineer.machines]).all(), [c1, c2])
+            self.assertEquals(sess.query(Company).join(['employees', Engineer.machines]).all(), [c1, c2])
 
-        self.assertEquals(sess.query(Company).join(['employees', Engineer.machines]).filter(Machine.name.ilike("%thinkpad%")).all(), [c1])
+            self.assertEquals(sess.query(Company).join(['employees', Engineer.machines]).filter(Machine.name.ilike("%thinkpad%")).all(), [c1])
         
-    def test_join_through_polymorphic(self):
-        sess = create_session()
-
-        for aliased in (True, False):
-            self.assertEquals(
-                sess.query(Company).\
-                    join(['employees', 'paperwork'], aliased=aliased).filter(Paperwork.description.like('%#2%')).all(),
-                [c1]
-            )
-
-            self.assertEquals(
-                sess.query(Company).\
-                    join(['employees', 'paperwork'], aliased=aliased).filter(Paperwork.description.like('%#%')).all(),
-                [c1, c2]
-            )
-
-            self.assertEquals(
-                sess.query(Company).\
-                    join(['employees', 'paperwork'], aliased=aliased).filter(Person.name.in_(['dilbert', 'vlad'])).filter(Paperwork.description.like('%#2%')).all(),
-                [c1]
-            )
+        def test_join_through_polymorphic(self):
+            if select_type == 'Joins':
+                return
+
+            sess = create_session()
+
+            for aliased in (True, False):
+                self.assertEquals(
+                    sess.query(Company).\
+                        join(['employees', 'paperwork'], aliased=aliased).filter(Paperwork.description.like('%#2%')).all(),
+                    [c1]
+                )
+
+                self.assertEquals(
+                    sess.query(Company).\
+                        join(['employees', 'paperwork'], aliased=aliased).filter(Paperwork.description.like('%#%')).all(),
+                    [c1, c2]
+                )
+
+                self.assertEquals(
+                    sess.query(Company).\
+                        join(['employees', 'paperwork'], aliased=aliased).filter(Person.name.in_(['dilbert', 'vlad'])).filter(Paperwork.description.like('%#2%')).all(),
+                    [c1]
+                )
         
-            self.assertEquals(
-                sess.query(Company).\
-                    join(['employees', 'paperwork'], aliased=aliased).filter(Person.name.in_(['dilbert', 'vlad'])).filter(Paperwork.description.like('%#%')).all(),
-                [c1, c2]
-            )
-
-            self.assertEquals(
-                sess.query(Company).join('employees', aliased=aliased).filter(Person.name.in_(['dilbert', 'vlad'])).\
-                    join('paperwork', from_joinpoint=True, aliased=aliased).filter(Paperwork.description.like('%#2%')).all(),
-                [c1]
-            )
-
-            self.assertEquals(
-                sess.query(Company).join('employees', aliased=aliased).filter(Person.name.in_(['dilbert', 'vlad'])).\
-                    join('paperwork', from_joinpoint=True, aliased=aliased).filter(Paperwork.description.like('%#%')).all(),
-                [c1, c2]
-            )
+                self.assertEquals(
+                    sess.query(Company).\
+                        join(['employees', 'paperwork'], aliased=aliased).filter(Person.name.in_(['dilbert', 'vlad'])).filter(Paperwork.description.like('%#%')).all(),
+                    [c1, c2]
+                )
+
+                self.assertEquals(
+                    sess.query(Company).join('employees', aliased=aliased).filter(Person.name.in_(['dilbert', 'vlad'])).\
+                        join('paperwork', from_joinpoint=True, aliased=aliased).filter(Paperwork.description.like('%#2%')).all(),
+                    [c1]
+                )
+
+                self.assertEquals(
+                    sess.query(Company).join('employees', aliased=aliased).filter(Person.name.in_(['dilbert', 'vlad'])).\
+                        join('paperwork', from_joinpoint=True, aliased=aliased).filter(Paperwork.description.like('%#%')).all(),
+                    [c1, c2]
+                )
         
-    def test_filter_on_baseclass(self):
-        sess = create_session()
+        def test_filter_on_baseclass(self):
+            sess = create_session()
 
-        self.assertEquals(sess.query(Person).all(), all_employees)
+            self.assertEquals(sess.query(Person).all(), all_employees)
 
-        self.assertEquals(sess.query(Person).first(), all_employees[0])
+            self.assertEquals(sess.query(Person).first(), all_employees[0])
         
-        self.assertEquals(sess.query(Person).filter(Person.person_id==e2.person_id).one(), e2)
+            self.assertEquals(sess.query(Person).filter(Person.person_id==e2.person_id).one(), e2)
+    
+    PolymorphicQueryTest.__name__ = "Polymorphic%sTest" % select_type
+    return PolymorphicQueryTest
 
+for select_type in ('', 'Unions', 'AliasedJoins', 'Joins'):
+    testclass = make_test(select_type)
+    exec("%s = testclass" % testclass.__name__)
+    
+del testclass
 
 if __name__ == "__main__":
     testenv.main()