From bc1cc72d89c8eb0dd76f123d77599940c3060e2d Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sat, 3 Mar 2007 21:25:44 +0000 Subject: [PATCH] "modernized" polymorph test, name change to "test_roundtrip" --- test/orm/abc_inheritance.py | 2 +- test/orm/polymorph.py | 277 +++++++++++++++++------------------- 2 files changed, 130 insertions(+), 149 deletions(-) diff --git a/test/orm/abc_inheritance.py b/test/orm/abc_inheritance.py index d928cf91c7..766bbc5df1 100644 --- a/test/orm/abc_inheritance.py +++ b/test/orm/abc_inheritance.py @@ -49,7 +49,7 @@ def produce_test(parent, child, direction): child_table.update(values={child_table.c.parent_id:None}).execute() super(ABCTest, self).tearDown() - def test_basic(self): + def test_roundtrip(self): parent_table = {"a":ta, "b":tb, "c": tc}[parent] child_table = {"a":ta, "b":tb, "c": tc}[child] diff --git a/test/orm/polymorph.py b/test/orm/polymorph.py index b0598e9af1..d2bd814d1e 100644 --- a/test/orm/polymorph.py +++ b/test/orm/polymorph.py @@ -2,7 +2,8 @@ import testbase from sqlalchemy import * import sets -# test classes +# tests basic polymorphic mapper loading/saving, minimal relations + class Person(object): def __init__(self, **kwargs): for key, value in kwargs.iteritems(): @@ -27,10 +28,9 @@ class Company(object): def __repr__(self): return "Company %s" % self.name -class MultipleTableTest(testbase.PersistTest): - def setUpAll(self, use_person_column=False): - global companies, people, engineers, managers, metadata - metadata = BoundMetaData(testbase.db) +class PolymorphTest(testbase.ORMTest): + def define_tables(self, metadata): + global companies, people, engineers, managers # a table to store companies companies = Table('companies', metadata, @@ -60,47 +60,7 @@ class MultipleTableTest(testbase.PersistTest): metadata.create_all() - def tearDownAll(self): - metadata.drop_all() - - def tearDown(self): - clear_mappers() - for t in metadata.table_iterator(reverse=True): - t.delete().execute().close() - - def test_f_f_f(self): - self.do_test(False, False, False) - def test_f_f_t(self): - self.do_test(False, False, True) - def test_f_t_f(self): - self.do_test(False, True, False) - def test_f_t_t(self): - self.do_test(False, True, True) - def test_t_f_f(self): - self.do_test(True, False, False) - def test_t_f_t(self): - self.do_test(True, False, True) - def test_t_t_f(self): - self.do_test(True, True, False) - def test_t_t_t(self): - self.do_test(True, True, True) - def test_f_f_f_t(self): - self.do_test(False, False, False, True) - def test_f_f_t_t(self): - self.do_test(False, False, True, True) - def test_f_t_f_t(self): - self.do_test(False, True, False, True) - def test_f_t_t_t(self): - self.do_test(False, True, True, True) - def test_t_f_f_t(self): - self.do_test(True, False, False, True) - def test_t_f_t_t(self): - self.do_test(True, False, True, True) - def test_t_t_f_t(self): - self.do_test(True, True, False, True) - def test_t_t_t_t(self): - self.do_test(True, True, True, True) - +class CompileTest(PolymorphTest): def testcompile(self): person_join = polymorphic_union( { 'engineer':people.join(engineers), @@ -162,109 +122,11 @@ class MultipleTableTest(testbase.PersistTest): assert False except exceptions.ArgumentError: assert True - - def do_test(self, include_base=False, lazy_relation=True, redefine_colprop=False, use_literal_join=False): - """tests the polymorph.py example, with several options: - - include_base - whether or not to include the base 'person' type in the union. - lazy_relation - whether or not the Company relation to People is lazy or eager. - redefine_colprop - if we redefine the 'name' column to be 'people_name' on the base Person class - """ - # create a union that represents both types of joins. - if include_base: - person_join = polymorphic_union( - { - 'engineer':people.join(engineers), - 'manager':people.join(managers), - 'person':people.select(people.c.type=='person'), - }, None, 'pjoin') - else: - person_join = polymorphic_union( - { - 'engineer':people.join(engineers), - 'manager':people.join(managers), - }, None, 'pjoin') - - if redefine_colprop: - person_mapper = mapper(Person, people, select_table=person_join, polymorphic_on=person_join.c.type, polymorphic_identity='person', properties= {'person_name':people.c.name}) - else: - person_mapper = mapper(Person, people, select_table=person_join, polymorphic_on=person_join.c.type, polymorphic_identity='person') - - mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer') - mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager') - - if use_literal_join: - mapper(Company, companies, properties={ - 'employees': relation(Person, lazy=lazy_relation, primaryjoin=people.c.company_id==companies.c.company_id, private=True, - backref="company" - ) - }) - else: - mapper(Company, companies, properties={ - 'employees': relation(Person, lazy=lazy_relation, private=True, - backref="company" - ) - }) - - if redefine_colprop: - person_attribute_name = 'person_name' - else: - person_attribute_name = 'name' - - session = create_session() - c = Company(name='company1') - c.employees.append(Manager(status='AAB', manager_name='manager1', **{person_attribute_name:'pointy haired boss'})) - c.employees.append(Engineer(status='BBA', engineer_name='engineer1', primary_language='java', **{person_attribute_name:'dilbert'})) - if include_base: - c.employees.append(Person(status='HHH', **{person_attribute_name:'joesmith'})) - c.employees.append(Engineer(status='CGG', engineer_name='engineer2', primary_language='python', **{person_attribute_name:'wally'})) - c.employees.append(Manager(status='ABA', manager_name='manager2', **{person_attribute_name:'jsmith'})) - session.save(c) - print session.new - session.flush() - session.clear() - id = c.company_id - c = session.query(Company).get(id) - for e in c.employees: - print e, e._instance_key, e.company - if include_base: - assert sets.Set([e.get_name() for e in c.employees]) == sets.Set(['pointy haired boss', 'dilbert', 'joesmith', 'wally', 'jsmith']) - else: - assert sets.Set([e.get_name() for e in c.employees]) == sets.Set(['pointy haired boss', 'dilbert', 'wally', 'jsmith']) - print "\n" - - - # test selecting from the query, using the base mapped table (people) as the selection criterion. - # in the case of the polymorphic Person query, the "people" selectable should be adapted to be "person_join" - dilbert = session.query(Person).selectfirst(people.c.name=='dilbert') - dilbert2 = session.query(Engineer).selectfirst(people.c.name=='dilbert') - assert dilbert is dilbert2 - - # test selecting from the query, joining against an alias of the base "people" table. test that - # the "palias" alias does *not* get sucked up into the "person_join" conversion. - palias = people.alias("palias") - session.query(Person).selectfirst((palias.c.name=='dilbert') & (palias.c.person_id==people.c.person_id)) - dilbert2 = session.query(Engineer).selectfirst((palias.c.name=='dilbert') & (palias.c.person_id==people.c.person_id)) - assert dilbert is dilbert2 - - session.query(Person).selectfirst((engineers.c.engineer_name=="engineer1") & (engineers.c.person_id==people.c.person_id)) - dilbert2 = session.query(Engineer).selectfirst(engineers.c.engineer_name=="engineer1") - assert dilbert is dilbert2 - - - dilbert.engineer_name = 'hes dibert!' - - session.flush() - session.clear() - - c = session.query(Company).get(id) - for e in c.employees: - print e, e._instance_key - - session.delete(c) - session.flush() +class InsertOrderTest(PolymorphTest): def test_insert_order(self): + """test that classes of multiple types mix up mapper inserts + so that insert order of individual tables is maintained""" person_join = polymorphic_union( { 'engineer':people.join(engineers), @@ -293,9 +155,128 @@ class MultipleTableTest(testbase.PersistTest): c = session.query(Company).get(c.company_id) for e in c.employees: print e, e._instance_key, e.company - + assert [e.get_name() for e in c.employees] == ['pointy haired boss', 'dilbert', 'joesmith', 'wally', 'jsmith'] +def generate_round_trip_test(include_base=False, lazy_relation=True, redefine_colprop=False, use_literal_join=False): + """generates a round trip test. + + include_base - whether or not to include the base 'person' type in the union. + lazy_relation - whether or not the Company relation to People is lazy or eager. + redefine_colprop - if we redefine the 'name' column to be 'people_name' on the base Person class + use_literal_join - primary join condition is explicitly specified + """ + class RoundTripTest(PolymorphTest): + def test_roundtrip(self): + # create a union that represents both types of joins. + if include_base: + person_join = polymorphic_union( + { + 'engineer':people.join(engineers), + 'manager':people.join(managers), + 'person':people.select(people.c.type=='person'), + }, None, 'pjoin') + else: + person_join = polymorphic_union( + { + 'engineer':people.join(engineers), + 'manager':people.join(managers), + }, None, 'pjoin') + + if redefine_colprop: + person_mapper = mapper(Person, people, select_table=person_join, polymorphic_on=person_join.c.type, polymorphic_identity='person', properties= {'person_name':people.c.name}) + else: + person_mapper = mapper(Person, people, select_table=person_join, polymorphic_on=person_join.c.type, polymorphic_identity='person') + + mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer') + mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager') + + if use_literal_join: + mapper(Company, companies, properties={ + 'employees': relation(Person, lazy=lazy_relation, primaryjoin=people.c.company_id==companies.c.company_id, private=True, + backref="company" + ) + }) + else: + mapper(Company, companies, properties={ + 'employees': relation(Person, lazy=lazy_relation, private=True, + backref="company" + ) + }) + + if redefine_colprop: + person_attribute_name = 'person_name' + else: + person_attribute_name = 'name' + + session = create_session() + c = Company(name='company1') + c.employees.append(Manager(status='AAB', manager_name='manager1', **{person_attribute_name:'pointy haired boss'})) + c.employees.append(Engineer(status='BBA', engineer_name='engineer1', primary_language='java', **{person_attribute_name:'dilbert'})) + if include_base: + c.employees.append(Person(status='HHH', **{person_attribute_name:'joesmith'})) + c.employees.append(Engineer(status='CGG', engineer_name='engineer2', primary_language='python', **{person_attribute_name:'wally'})) + c.employees.append(Manager(status='ABA', manager_name='manager2', **{person_attribute_name:'jsmith'})) + session.save(c) + print session.new + session.flush() + session.clear() + id = c.company_id + c = session.query(Company).get(id) + for e in c.employees: + print e, e._instance_key, e.company + if include_base: + assert sets.Set([e.get_name() for e in c.employees]) == sets.Set(['pointy haired boss', 'dilbert', 'joesmith', 'wally', 'jsmith']) + else: + assert sets.Set([e.get_name() for e in c.employees]) == sets.Set(['pointy haired boss', 'dilbert', 'wally', 'jsmith']) + print "\n" + + + # test selecting from the query, using the base mapped table (people) as the selection criterion. + # in the case of the polymorphic Person query, the "people" selectable should be adapted to be "person_join" + dilbert = session.query(Person).selectfirst(people.c.name=='dilbert') + dilbert2 = session.query(Engineer).selectfirst(people.c.name=='dilbert') + assert dilbert is dilbert2 + + # test selecting from the query, joining against an alias of the base "people" table. test that + # the "palias" alias does *not* get sucked up into the "person_join" conversion. + palias = people.alias("palias") + session.query(Person).selectfirst((palias.c.name=='dilbert') & (palias.c.person_id==people.c.person_id)) + dilbert2 = session.query(Engineer).selectfirst((palias.c.name=='dilbert') & (palias.c.person_id==people.c.person_id)) + assert dilbert is dilbert2 + + session.query(Person).selectfirst((engineers.c.engineer_name=="engineer1") & (engineers.c.person_id==people.c.person_id)) + dilbert2 = session.query(Engineer).selectfirst(engineers.c.engineer_name=="engineer1") + assert dilbert is dilbert2 + + + dilbert.engineer_name = 'hes dibert!' + + session.flush() + session.clear() + + c = session.query(Company).get(id) + for e in c.employees: + print e, e._instance_key + + session.delete(c) + session.flush() + + RoundTripTest.__name__ = "Test%s%s%s%s" % ( + (lazy_relation and "Lazy" or "Eager"), + (include_base and "Inclbase" or ""), + (redefine_colprop and "Redefcol" or ""), + (use_literal_join and "Litjoin" or "") + ) + return RoundTripTest + +for include_base in [True, False]: + for lazy_relation in [True, False]: + for redefine_colprop in [True, False]: + for use_literal_join in [True, False]: + testclass = generate_round_trip_test(include_base, lazy_relation, redefine_colprop, use_literal_join) + exec("%s = testclass" % testclass.__name__) + if __name__ == "__main__": testbase.main() -- 2.47.2