From 9b86654981ce0c172ad09401ad41e15d5b8d0487 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Thu, 16 Feb 2012 18:54:10 -0500 Subject: [PATCH] - fix some unclear phrases in query regarding polymorphic, slowly approaching [ticket:2333] - pep8 most of the big old polymorphic tests, break lots of the inheritance/test_query tests into individual tests since these are the ones that are easily broken when screwing with query --- lib/sqlalchemy/orm/query.py | 34 +- ...st_polymorph2.py => test_assorted_poly.py} | 560 ++++-- ..._polymorph.py => test_poly_persistence.py} | 156 +- ...{test_query.py => test_polymorphic_rel.py} | 1587 ++++++----------- test/orm/inheritance/test_relationship.py | 877 +++++++++ 5 files changed, 1888 insertions(+), 1326 deletions(-) rename test/orm/inheritance/{test_polymorph2.py => test_assorted_poly.py} (66%) rename test/orm/inheritance/{test_polymorph.py => test_poly_persistence.py} (60%) rename test/orm/inheritance/{test_query.py => test_polymorphic_rel.py} (56%) create mode 100644 test/orm/inheritance/test_relationship.py diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index a00d4078a0..6321715f4d 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -133,7 +133,7 @@ class Query(object): with_polymorphic = mapper._with_polymorphic_mappers if mapper.mapped_table not in \ self._polymorphic_adapters: - self.__mapper_loads_polymorphically_with(mapper, + self._mapper_loads_polymorphically_with(mapper, sql_util.ColumnAdapter( selectable, mapper._equivalent_columns)) @@ -150,7 +150,7 @@ class Query(object): is_aliased_class, with_polymorphic) ent.setup_entity(entity, *d[entity]) - def __mapper_loads_polymorphically_with(self, mapper, adapter): + def _mapper_loads_polymorphically_with(self, mapper, adapter): for m2 in mapper._with_polymorphic_mappers: self._polymorphic_adapters[m2] = adapter for m in m2.iterate_to_root(): @@ -174,10 +174,6 @@ class Query(object): self._from_obj_alias = sql_util.ColumnAdapter( self._from_obj[0], equivs) - def _get_polymorphic_adapter(self, entity, selectable): - self.__mapper_loads_polymorphically_with(entity.mapper, - sql_util.ColumnAdapter(selectable, - entity.mapper._equivalent_columns)) def _reset_polymorphic_adapter(self, mapper): for m2 in mapper._with_polymorphic_mappers: @@ -325,13 +321,6 @@ class Query(object): ) return self._entity_zero() - def _generate_mapper_zero(self): - if not getattr(self._entities[0], 'primary_entity', False): - raise sa_exc.InvalidRequestError( - "No primary mapper set up for this Query.") - entity = self._entities[0]._clone() - self._entities = [entity] + self._entities[1:] - return entity def __all_equivs(self): equivs = {} @@ -602,7 +591,12 @@ class Query(object): such as concrete table mappers. """ - entity = self._generate_mapper_zero() + + if not getattr(self._entities[0], 'primary_entity', False): + raise sa_exc.InvalidRequestError( + "No primary mapper set up for this Query.") + entity = self._entities[0]._clone() + self._entities = [entity] + self._entities[1:] entity.set_with_polymorphic(self, cls_or_mappers, selectable=selectable, @@ -1584,7 +1578,6 @@ class Query(object): consistent format with which to form the actual JOIN constructs. """ - self._polymorphic_adapters = self._polymorphic_adapters.copy() if not from_joinpoint: self._reset_joinpoint() @@ -1684,6 +1677,8 @@ class Query(object): onclause, outerjoin, create_aliases, prop): """append a JOIN to the query's from clause.""" + self._polymorphic_adapters = self._polymorphic_adapters.copy() + if left is None: if self._from_obj: left = self._from_obj[0] @@ -1759,7 +1754,8 @@ class Query(object): # until reset_joinpoint() is called. if need_adapter: self._filter_aliases = ORMAdapter(right, - equivalents=right_mapper and right_mapper._equivalent_columns or {}, + equivalents=right_mapper and + right_mapper._equivalent_columns or {}, chain_to=self._filter_aliases) # if the onclause is a ClauseElement, adapt it with any @@ -1772,7 +1768,7 @@ class Query(object): # ensure that columns retrieved from this target in the result # set are also adapted. if aliased_entity and not create_aliases: - self.__mapper_loads_polymorphically_with( + self._mapper_loads_polymorphically_with( right_mapper, ORMAdapter( right, @@ -2960,7 +2956,9 @@ class _MapperEntity(_QueryEntity): # with_polymorphic() can be applied to aliases if not self.is_aliased_class: self.selectable = from_obj - self.adapter = query._get_polymorphic_adapter(self, from_obj) + query._mapper_loads_polymorphically_with(self.mapper, + sql_util.ColumnAdapter(from_obj, + self.mapper._equivalent_columns)) filter_fn = id diff --git a/test/orm/inheritance/test_polymorph2.py b/test/orm/inheritance/test_assorted_poly.py similarity index 66% rename from test/orm/inheritance/test_polymorph2.py rename to test/orm/inheritance/test_assorted_poly.py index b7ed5668ed..0a10071ea5 100644 --- a/test/orm/inheritance/test_polymorph2.py +++ b/test/orm/inheritance/test_assorted_poly.py @@ -1,5 +1,6 @@ -"""this is a test suite consisting mainly of end-user test cases, testing all kinds of painful -inheritance setups for which we maintain compatibility. +"""Very old inheritance-related tests. + + """ from test.lib.testing import eq_ @@ -28,13 +29,18 @@ class RelationshipTest1(fixtures.MappedTest): global people, managers people = Table('people', metadata, - Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True), - Column('manager_id', Integer, ForeignKey('managers.person_id', use_alter=True, name="mpid_fq")), + Column('person_id', Integer, Sequence('person_id_seq', + optional=True), + primary_key=True), + Column('manager_id', Integer, + ForeignKey('managers.person_id', + use_alter=True, name="mpid_fq")), Column('name', String(50)), Column('type', String(30))) managers = Table('managers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), Column('status', String(30)), Column('manager_name', String(50)) ) @@ -49,17 +55,17 @@ class RelationshipTest1(fixtures.MappedTest): class Manager(Person): pass - # note that up until recently (0.4.4), we had to specify "foreign_keys" here - # for this primary join. mapper(Person, people, properties={ - 'manager':relationship(Manager, primaryjoin=(people.c.manager_id == - managers.c.person_id), + 'manager':relationship(Manager, primaryjoin=( + people.c.manager_id == + managers.c.person_id), uselist=False, post_update=True) }) mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id) - eq_(class_mapper(Person).get_property('manager').synchronize_pairs, [(managers.c.person_id,people.c.manager_id)]) + eq_(class_mapper(Person).get_property('manager').synchronize_pairs, + [(managers.c.person_id,people.c.manager_id)]) session = create_session() p = Person(name='some person') @@ -71,7 +77,6 @@ class RelationshipTest1(fixtures.MappedTest): p = session.query(Person).get(p.person_id) m = session.query(Manager).get(m.person_id) - print p, m, p.manager assert p.manager is m def test_descendant_refs_parent(self): @@ -81,8 +86,12 @@ class RelationshipTest1(fixtures.MappedTest): pass mapper(Person, people) - mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, properties={ - 'employee':relationship(Person, primaryjoin=(people.c.manager_id == + mapper(Manager, managers, inherits=Person, + inherit_condition=people.c.person_id== + managers.c.person_id, + properties={ + 'employee':relationship(Person, primaryjoin=( + people.c.manager_id == managers.c.person_id), foreign_keys=[people.c.manager_id], uselist=False, post_update=True) @@ -98,7 +107,6 @@ class RelationshipTest1(fixtures.MappedTest): p = session.query(Person).get(p.person_id) m = session.query(Manager).get(m.person_id) - print p, m, m.employee assert m.employee is p class RelationshipTest2(fixtures.MappedTest): @@ -107,18 +115,21 @@ class RelationshipTest2(fixtures.MappedTest): def define_tables(cls, metadata): global people, managers, data people = Table('people', metadata, - Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('person_id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('name', String(50)), Column('type', String(30))) managers = Table('managers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), Column('manager_id', Integer, ForeignKey('people.person_id')), Column('status', String(30)), ) data = Table('data', metadata, - Column('person_id', Integer, ForeignKey('managers.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('managers.person_id'), + primary_key=True), Column('data', String(30)) ) @@ -144,13 +155,15 @@ class RelationshipTest2(fixtures.MappedTest): if jointype == "join1": poly_union = polymorphic_union({ 'person':people.select(people.c.type=='person'), - 'manager':join(people, managers, people.c.person_id==managers.c.person_id) + 'manager':join(people, managers, + people.c.person_id==managers.c.person_id) }, None) polymorphic_on=poly_union.c.type elif jointype == "join2": poly_union = polymorphic_union({ 'person':people.select(people.c.type=='person'), - 'manager':managers.join(people, people.c.person_id==managers.c.person_id) + 'manager':managers.join(people, + people.c.person_id==managers.c.person_id) }, None) polymorphic_on=poly_union.c.type elif jointype == "join3": @@ -163,19 +176,36 @@ class RelationshipTest2(fixtures.MappedTest): self.data = data mapper(Data, data) - mapper(Person, people, with_polymorphic=('*', poly_union), polymorphic_identity='person', polymorphic_on=polymorphic_on) + mapper(Person, people, + with_polymorphic=('*', poly_union), + polymorphic_identity='person', + polymorphic_on=polymorphic_on) if usedata: - mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager', + mapper(Manager, managers, + inherits=Person, + inherit_condition=people.c.person_id== + managers.c.person_id, + polymorphic_identity='manager', properties={ - 'colleague':relationship(Person, primaryjoin=managers.c.manager_id==people.c.person_id, lazy='select', uselist=False), + 'colleague':relationship( + Person, + primaryjoin=managers.c.manager_id== + people.c.person_id, + lazy='select', uselist=False), 'data':relationship(Data, uselist=False) } ) else: - mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager', + mapper(Manager, managers, inherits=Person, + inherit_condition=people.c.person_id== + managers.c.person_id, + polymorphic_identity='manager', properties={ - 'colleague':relationship(Person, primaryjoin=managers.c.manager_id==people.c.person_id, lazy='select', uselist=False) + 'colleague':relationship(Person, + primaryjoin=managers.c.manager_id== + people.c.person_id, + lazy='select', uselist=False) } ) @@ -191,8 +221,6 @@ class RelationshipTest2(fixtures.MappedTest): sess.expunge_all() p = sess.query(Person).get(p.person_id) m = sess.query(Manager).get(m.person_id) - print p - print m assert m.colleague is p if usedata: assert m.data.data == 'ms data' @@ -203,18 +231,21 @@ class RelationshipTest3(fixtures.MappedTest): def define_tables(cls, metadata): global people, managers, data people = Table('people', metadata, - Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('person_id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('colleague_id', Integer, ForeignKey('people.person_id')), Column('name', String(50)), Column('type', String(30))) managers = Table('managers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), Column('status', String(30)), ) data = Table('data', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), Column('data', String(30)) ) @@ -232,12 +263,14 @@ def _generate_test(jointype="join1", usedata=False): if jointype == "join1": poly_union = polymorphic_union({ - 'manager':managers.join(people, people.c.person_id==managers.c.person_id), + 'manager':managers.join(people, + people.c.person_id==managers.c.person_id), 'person':people.select(people.c.type=='person') }, None) elif jointype =="join2": poly_union = polymorphic_union({ - 'manager':join(people, managers, people.c.person_id==managers.c.person_id), + 'manager':join(people, managers, + people.c.person_id==managers.c.person_id), 'person':people.select(people.c.type=='person') }, None) elif jointype == 'join3': @@ -249,21 +282,35 @@ def _generate_test(jointype="join1", usedata=False): mapper(Data, data) if usedata: - mapper(Person, people, with_polymorphic=('*', poly_union), polymorphic_identity='person', polymorphic_on=people.c.type, + mapper(Person, people, + with_polymorphic=('*', poly_union), + polymorphic_identity='person', + polymorphic_on=people.c.type, properties={ - 'colleagues':relationship(Person, primaryjoin=people.c.colleague_id==people.c.person_id, remote_side=people.c.colleague_id, uselist=True), + 'colleagues':relationship(Person, + primaryjoin=people.c.colleague_id== + people.c.person_id, + remote_side=people.c.colleague_id, + uselist=True), 'data':relationship(Data, uselist=False) } ) else: - mapper(Person, people, with_polymorphic=('*', poly_union), polymorphic_identity='person', polymorphic_on=people.c.type, + mapper(Person, people, + with_polymorphic=('*', poly_union), + polymorphic_identity='person', + polymorphic_on=people.c.type, properties={ - 'colleagues':relationship(Person, primaryjoin=people.c.colleague_id==people.c.person_id, + 'colleagues':relationship(Person, + primaryjoin=people.c.colleague_id==people.c.person_id, remote_side=people.c.colleague_id, uselist=True) } ) - mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager') + mapper(Manager, managers, inherits=Person, + inherit_condition=people.c.person_id== + managers.c.person_id, + polymorphic_identity='manager') sess = create_session() p = Person(name='person1') @@ -285,7 +332,6 @@ def _generate_test(jointype="join1", usedata=False): p2 = sess.query(Person).get(p2.person_id) p3 = sess.query(Person).get(p3.person_id) m = sess.query(Person).get(m.person_id) - print p, p2, p.colleagues, m.colleagues assert len(p.colleagues) == 1 assert p.colleagues == [p2] assert m.colleagues == [p3] @@ -309,26 +355,32 @@ class RelationshipTest4(fixtures.MappedTest): def define_tables(cls, metadata): global people, engineers, managers, cars people = Table('people', metadata, - Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('person_id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('name', String(50))) engineers = Table('engineers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), Column('status', String(30))) managers = Table('managers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), Column('longer_status', String(70))) cars = Table('cars', metadata, - Column('car_id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('car_id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('owner', Integer, ForeignKey('people.person_id'))) - def testmanytoonepolymorphic(self): - """in this test, the polymorphic union is between two subclasses, but does not include the base table by itself - in the union. however, the primaryjoin condition is going to be against the base table, and its a many-to-one - relationship (unlike the test in polymorph.py) so the column in the base table is explicit. Can the ClauseAdapter - figure out how to alias the primaryjoin to the polymorphic union ?""" + def test_many_to_one_polymorphic(self): + """in this test, the polymorphic union is between two subclasses, but + does not include the base table by itself in the union. however, the + primaryjoin condition is going to be against the base table, and its a + many-to-one relationship (unlike the test in polymorph.py) so the + column in the base table is explicit. Can the ClauseAdapter figure out + how to alias the primaryjoin to the polymorphic union ?""" # class definitions class Person(object): @@ -339,10 +391,12 @@ class RelationshipTest4(fixtures.MappedTest): return "Ordinary person %s" % self.name class Engineer(Person): def __repr__(self): - return "Engineer %s, status %s" % (self.name, self.status) + return "Engineer %s, status %s" % \ + (self.name, self.status) class Manager(Person): def __repr__(self): - return "Manager %s, status %s" % (self.name, self.longer_status) + return "Manager %s, status %s" % \ + (self.name, self.longer_status) class Car(object): def __init__(self, **kwargs): for key, value in kwargs.iteritems(): @@ -357,24 +411,36 @@ class RelationshipTest4(fixtures.MappedTest): 'manager':people.join(managers), }, "type", 'employee_join') - person_mapper = mapper(Person, people, with_polymorphic=('*', employee_join), polymorphic_on=employee_join.c.type, polymorphic_identity='person') - engineer_mapper = mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer') - manager_mapper = mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager') - car_mapper = mapper(Car, cars, properties= {'employee':relationship(person_mapper)}) + person_mapper = mapper(Person, people, + with_polymorphic=('*', employee_join), + polymorphic_on=employee_join.c.type, + polymorphic_identity='person') + engineer_mapper = mapper(Engineer, engineers, + inherits=person_mapper, + polymorphic_identity='engineer') + manager_mapper = mapper(Manager, managers, + inherits=person_mapper, + polymorphic_identity='manager') + car_mapper = mapper(Car, cars, + properties= {'employee': + relationship(person_mapper)}) session = create_session() # creating 5 managers named from M1 to E5 for i in range(1,5): - session.add(Manager(name="M%d" % i,longer_status="YYYYYYYYY")) + session.add(Manager(name="M%d" % i, + longer_status="YYYYYYYYY")) # creating 5 engineers named from E1 to E5 for i in range(1,5): session.add(Engineer(name="E%d" % i,status="X")) session.flush() - engineer4 = session.query(Engineer).filter(Engineer.name=="E4").first() - manager3 = session.query(Manager).filter(Manager.name=="M3").first() + engineer4 = session.query(Engineer).\ + filter(Engineer.name=="E4").first() + manager3 = session.query(Manager).\ + filter(Manager.name=="M3").first() car1 = Car(employee=engineer4) session.add(car1) @@ -385,30 +451,27 @@ class RelationshipTest4(fixtures.MappedTest): session.expunge_all() def go(): - testcar = session.query(Car).options(joinedload('employee')).get(car1.car_id) + testcar = session.query(Car).options( + joinedload('employee') + ).get(car1.car_id) assert str(testcar.employee) == "Engineer E4, status X" self.assert_sql_count(testing.db, go, 1) - print "----------------------------" car1 = session.query(Car).get(car1.car_id) - print "----------------------------" usingGet = session.query(person_mapper).get(car1.owner) - print "----------------------------" usingProperty = car1.employee - print "----------------------------" - # All print should output the same person (engineer E4) assert str(engineer4) == "Engineer E4, status X" - print str(usingGet) assert str(usingGet) == "Engineer E4, status X" assert str(usingProperty) == "Engineer E4, status X" session.expunge_all() - print "-----------------------------------------------------------------" # and now for the lightning round, eager ! def go(): - testcar = session.query(Car).options(joinedload('employee')).get(car1.car_id) + testcar = session.query(Car).options( + joinedload('employee') + ).get(car1.car_id) assert str(testcar.employee) == "Engineer E4, status X" self.assert_sql_count(testing.db, go, 1) @@ -422,25 +485,29 @@ class RelationshipTest5(fixtures.MappedTest): def define_tables(cls, metadata): global people, engineers, managers, cars people = Table('people', metadata, - Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('person_id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('name', String(50)), Column('type', String(50))) engineers = Table('engineers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), Column('status', String(30))) managers = Table('managers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), Column('longer_status', String(70))) cars = Table('cars', metadata, - Column('car_id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('car_id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('owner', Integer, ForeignKey('people.person_id'))) def test_eager_empty(self): - """test parent object with child relationship to an inheriting mapper, using eager loads, - works when there are no child objects present""" + """test parent object with child relationship to an inheriting mapper, + using eager loads, works when there are no child objects present""" class Person(object): def __init__(self, **kwargs): @@ -450,10 +517,12 @@ class RelationshipTest5(fixtures.MappedTest): return "Ordinary person %s" % self.name class Engineer(Person): def __repr__(self): - return "Engineer %s, status %s" % (self.name, self.status) + return "Engineer %s, status %s" % \ + (self.name, self.status) class Manager(Person): def __repr__(self): - return "Manager %s, status %s" % (self.name, self.longer_status) + return "Manager %s, status %s" % \ + (self.name, self.longer_status) class Car(object): def __init__(self, **kwargs): for key, value in kwargs.iteritems(): @@ -461,10 +530,18 @@ class RelationshipTest5(fixtures.MappedTest): def __repr__(self): return "Car number %d" % self.car_id - person_mapper = mapper(Person, people, polymorphic_on=people.c.type, polymorphic_identity='person') - engineer_mapper = mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer') - manager_mapper = mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager') - car_mapper = mapper(Car, cars, properties= {'manager':relationship(manager_mapper, lazy='joined')}) + person_mapper = mapper(Person, people, + polymorphic_on=people.c.type, + polymorphic_identity='person') + engineer_mapper = mapper(Engineer, engineers, + inherits=person_mapper, + polymorphic_identity='engineer') + manager_mapper = mapper(Manager, managers, + inherits=person_mapper, + polymorphic_identity='manager') + car_mapper = mapper(Car, cars, properties= { + 'manager':relationship( + manager_mapper, lazy='joined')}) sess = create_session() car1 = Car() @@ -480,34 +557,42 @@ class RelationshipTest5(fixtures.MappedTest): assert carlist[1].manager.person_id == car2.manager.person_id class RelationshipTest6(fixtures.MappedTest): - """test self-referential relationships on a single joined-table inheritance mapper""" + """test self-referential relationships on a single joined-table + inheritance mapper""" + @classmethod def define_tables(cls, metadata): global people, managers, data people = Table('people', metadata, - Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('person_id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('name', String(50)), ) managers = Table('managers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), - Column('colleague_id', Integer, ForeignKey('managers.person_id')), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), + Column('colleague_id', Integer, + ForeignKey('managers.person_id')), Column('status', String(30)), ) - def testbasic(self): + def test_basic(self): class Person(AttrSettable): pass class Manager(Person): pass mapper(Person, people) - # relationship is from people.join(managers) -> people.join(managers). self referential logic - # needs to be used to figure out the lazy clause, meaning create_lazy_clause must go from parent.mapped_table - # to parent.mapped_table - mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, + + mapper(Manager, managers, inherits=Person, + inherit_condition=people.c.person_id==\ + managers.c.person_id, properties={ - 'colleague':relationship(Manager, primaryjoin=managers.c.colleague_id==managers.c.person_id, lazy='select', uselist=False) + 'colleague':relationship(Manager, + primaryjoin=managers.c.colleague_id==\ + managers.c.person_id, + lazy='select', uselist=False) } ) @@ -528,29 +613,36 @@ class RelationshipTest7(fixtures.MappedTest): def define_tables(cls, metadata): global people, engineers, managers, cars, offroad_cars cars = Table('cars', metadata, - Column('car_id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('car_id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('name', String(30))) offroad_cars = Table('offroad_cars', metadata, - Column('car_id',Integer, ForeignKey('cars.car_id'),nullable=False,primary_key=True)) + Column('car_id',Integer, ForeignKey('cars.car_id'), + nullable=False,primary_key=True)) people = Table('people', metadata, - Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('car_id', Integer, ForeignKey('cars.car_id'), nullable=False), + Column('person_id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('car_id', Integer, ForeignKey('cars.car_id'), + nullable=False), Column('name', String(50))) engineers = Table('engineers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), Column('field', String(30))) managers = Table('managers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), Column('category', String(70))) @testing.uses_deprecated("fold_equivalents is deprecated.") def test_manytoone_lazyload(self): - """test that lazy load clause to a polymorphic child mapper generates correctly [ticket:493]""" + """test that lazy load clause to a polymorphic child mapper generates + correctly [ticket:493]""" class PersistentObject(object): def __init__(self, **kwargs): @@ -567,19 +659,23 @@ class RelationshipTest7(fixtures.MappedTest): class Engineer(Person): def __repr__(self): - return "Engineer %s, field %s" % (self.name, self.field) + return "Engineer %s, field %s" % (self.name, + self.field) class Manager(Person): def __repr__(self): - return "Manager %s, category %s" % (self.name, self.category) + return "Manager %s, category %s" % (self.name, + self.category) class Car(PersistentObject): def __repr__(self): - return "Car number %d, name %s" % (self.car_id, self.name) + return "Car number %d, name %s" % \ + (self.car_id, self.name) class Offraod_Car(Car): def __repr__(self): - return "Offroad Car number %d, name %s" % (self.car_id,self.name) + return "Offroad Car number %d, name %s" % \ + (self.car_id,self.name) employee_join = polymorphic_union( { @@ -589,7 +685,9 @@ class RelationshipTest7(fixtures.MappedTest): car_join = polymorphic_union( { - 'car' : cars.outerjoin(offroad_cars).select(offroad_cars.c.car_id == None, fold_equivalents=True), + 'car' : cars.outerjoin(offroad_cars).\ + select(offroad_cars.c.car_id == None, + fold_equivalents=True), 'offroad' : cars.join(offroad_cars) }, "type", 'car_join') @@ -597,15 +695,21 @@ class RelationshipTest7(fixtures.MappedTest): with_polymorphic=('*', car_join) ,polymorphic_on=car_join.c.type, polymorphic_identity='car', ) - offroad_car_mapper = mapper(Offraod_Car, offroad_cars, inherits=car_mapper, polymorphic_identity='offroad') + offroad_car_mapper = mapper(Offraod_Car, offroad_cars, + inherits=car_mapper, polymorphic_identity='offroad') person_mapper = mapper(Person, people, - with_polymorphic=('*', employee_join), polymorphic_on=employee_join.c.type, + with_polymorphic=('*', employee_join), + polymorphic_on=employee_join.c.type, polymorphic_identity='person', properties={ 'car':relationship(car_mapper) }) - engineer_mapper = mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer') - manager_mapper = mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager') + engineer_mapper = mapper(Engineer, engineers, + inherits=person_mapper, + polymorphic_identity='engineer') + manager_mapper = mapper(Manager, managers, + inherits=person_mapper, + polymorphic_identity='manager') session = create_session() basic_car=Car(name="basic") @@ -616,7 +720,8 @@ class RelationshipTest7(fixtures.MappedTest): car=Car() else: car=Offraod_Car() - session.add(Manager(name="M%d" % i,category="YYYYYYYYY",car=car)) + session.add(Manager(name="M%d" % i, + category="YYYYYYYYY",car=car)) session.add(Engineer(name="E%d" % i,field="X",car=car)) session.flush() session.expunge_all() @@ -630,12 +735,14 @@ class RelationshipTest8(fixtures.MappedTest): def define_tables(cls, metadata): global taggable, users taggable = Table('taggable', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('type', String(30)), Column('owner_id', Integer, ForeignKey('taggable.id')), ) users = Table ('users', metadata, - Column('id', Integer, ForeignKey('taggable.id'), primary_key=True), + Column('id', Integer, ForeignKey('taggable.id'), + primary_key=True), Column('data', String(50)), ) @@ -646,15 +753,19 @@ class RelationshipTest8(fixtures.MappedTest): class User(Taggable): pass - mapper( Taggable, taggable, polymorphic_on=taggable.c.type, polymorphic_identity='taggable', properties = { + mapper( Taggable, taggable, + polymorphic_on=taggable.c.type, + polymorphic_identity='taggable', + properties = { 'owner' : relationship (User, - primaryjoin=taggable.c.owner_id ==taggable.c.id, - remote_side=taggable.c.id - ), + primaryjoin=taggable.c.owner_id ==taggable.c.id, + remote_side=taggable.c.id + ), }) - mapper(User, users, inherits=Taggable, polymorphic_identity='user', + mapper(User, users, inherits=Taggable, + polymorphic_identity='user', inherit_condition=users.c.id == taggable.c.id, ) @@ -685,26 +796,34 @@ class GenerativeTest(fixtures.TestBase, AssertsExecutionResults): metadata = MetaData(testing.db) # table definitions status = Table('status', metadata, - Column('status_id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('status_id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('name', String(20))) people = Table('people', metadata, - Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('status_id', Integer, ForeignKey('status.status_id'), nullable=False), + Column('person_id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('status_id', Integer, ForeignKey('status.status_id'), + nullable=False), Column('name', String(50))) engineers = Table('engineers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), Column('field', String(30))) managers = Table('managers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), Column('category', String(70))) cars = Table('cars', metadata, - Column('car_id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('status_id', Integer, ForeignKey('status.status_id'), nullable=False), - Column('owner', Integer, ForeignKey('people.person_id'), nullable=False)) + Column('car_id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('status_id', Integer, ForeignKey('status.status_id'), + nullable=False), + Column('owner', Integer, ForeignKey('people.person_id'), + nullable=False)) metadata.create_all() @@ -730,10 +849,12 @@ class GenerativeTest(fixtures.TestBase, AssertsExecutionResults): return "Ordinary person %s" % self.name class Engineer(Person): def __repr__(self): - return "Engineer %s, field %s, status %s" % (self.name, self.field, self.status) + return "Engineer %s, field %s, status %s" % ( + self.name, self.field, self.status) class Manager(Person): def __repr__(self): - return "Manager %s, category %s, status %s" % (self.name, self.category, self.status) + return "Manager %s, category %s, status %s" % ( + self.name, self.category, self.status) class Car(PersistentObject): def __repr__(self): return "Car number %d" % self.car_id @@ -747,11 +868,19 @@ class GenerativeTest(fixtures.TestBase, AssertsExecutionResults): status_mapper = mapper(Status, status) person_mapper = mapper(Person, people, - with_polymorphic=('*', employee_join), polymorphic_on=employee_join.c.type, - polymorphic_identity='person', properties={'status':relationship(status_mapper)}) - engineer_mapper = mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer') - manager_mapper = mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager') - car_mapper = mapper(Car, cars, properties= {'employee':relationship(person_mapper), 'status':relationship(status_mapper)}) + with_polymorphic=('*', employee_join), + polymorphic_on=employee_join.c.type, + polymorphic_identity='person', + properties={'status':relationship(status_mapper)}) + engineer_mapper = mapper(Engineer, engineers, + inherits=person_mapper, + polymorphic_identity='engineer') + manager_mapper = mapper(Manager, managers, + inherits=person_mapper, + polymorphic_identity='manager') + car_mapper = mapper(Car, cars, properties= { + 'employee':relationship(person_mapper), + 'status':relationship(status_mapper)}) session = create_session() @@ -762,22 +891,26 @@ class GenerativeTest(fixtures.TestBase, AssertsExecutionResults): session.add(dead) session.flush() - # TODO: we haven't created assertions for all the data combinations created here + # TODO: we haven't created assertions for all + # the data combinations created here - # creating 5 managers named from M1 to M5 and 5 engineers named from E1 to E5 + # creating 5 managers named from M1 to M5 + # and 5 engineers named from E1 to E5 # M4, M5, E4 and E5 are dead for i in range(1,5): if i<4: st=active else: st=dead - session.add(Manager(name="M%d" % i,category="YYYYYYYYY",status=st)) + session.add(Manager(name="M%d" % i, + category="YYYYYYYYY",status=st)) session.add(Engineer(name="E%d" % i,field="X",status=st)) session.flush() # get E4 - engineer4 = session.query(engineer_mapper).filter_by(name="E4").one() + engineer4 = session.query(engineer_mapper).\ + filter_by(name="E4").one() # create 2 cars for E4, one active and one dead car1 = Car(employee=engineer4,status=active) @@ -791,12 +924,23 @@ class GenerativeTest(fixtures.TestBase, AssertsExecutionResults): e = exists([Car.owner], Car.owner==employee_join.c.person_id) Query(Person)._adapt_clause(employee_join, False, False) - r = session.query(Person).filter(Person.name.like('%2')).join('status').filter_by(name="active").order_by(Person.person_id) - eq_(str(list(r)), "[Manager M2, category YYYYYYYYY, status Status active, Engineer E2, field X, status Status active]") - r = session.query(Engineer).join('status').filter(Person.name.in_(['E2', 'E3', 'E4', 'M4', 'M2', 'M1']) & (status.c.name=="active")).order_by(Person.name) - eq_(str(list(r)), "[Engineer E2, field X, status Status active, Engineer E3, field X, status Status active]") - - r = session.query(Person).filter(exists([1], Car.owner==Person.person_id)) + r = session.query(Person).filter(Person.name.like('%2')).\ + join('status').\ + filter_by(name="active").\ + order_by(Person.person_id) + eq_(str(list(r)), "[Manager M2, category YYYYYYYYY, status " + "Status active, Engineer E2, field X, " + "status Status active]") + r = session.query(Engineer).join('status').\ + filter(Person.name.in_( + ['E2', 'E3', 'E4', 'M4', 'M2', 'M1']) & + (status.c.name=="active")).order_by(Person.name) + eq_(str(list(r)), "[Engineer E2, field X, status Status " + "active, Engineer E3, field X, status " + "Status active]") + + r = session.query(Person).filter(exists([1], + Car.owner==Person.person_id)) eq_(str(list(r)), "[Engineer E4, field X, status Status dead]") class MultiLevelTest(fixtures.MappedTest): @@ -805,41 +949,48 @@ class MultiLevelTest(fixtures.MappedTest): global table_Employee, table_Engineer, table_Manager table_Employee = Table( 'Employee', metadata, Column( 'name', type_= String(100), ), - Column( 'id', primary_key= True, type_= Integer, test_needs_autoincrement=True), + Column( 'id', primary_key= True, type_= Integer, + test_needs_autoincrement=True), Column( 'atype', type_= String(100), ), ) table_Engineer = Table( 'Engineer', metadata, Column( 'machine', type_= String(100), ), - Column( 'id', Integer, ForeignKey( 'Employee.id', ), primary_key= True, ), + Column( 'id', Integer, ForeignKey( 'Employee.id', ), + primary_key= True), ) table_Manager = Table( 'Manager', metadata, Column( 'duties', type_= String(100), ), - Column( 'id', Integer, ForeignKey( 'Engineer.id', ), primary_key= True, ), + Column( 'id', Integer, ForeignKey( 'Engineer.id', ), + primary_key= True, ), ) + def test_threelevels(self): class Employee( object): def set( me, **kargs): for k,v in kargs.iteritems(): setattr( me, k, v) return me - def __str__(me): return str(me.__class__.__name__)+':'+str(me.name) + def __str__(me): + return str(me.__class__.__name__)+':'+str(me.name) __repr__ = __str__ - class Engineer( Employee): pass - class Manager( Engineer): pass + class Engineer(Employee): + pass + class Manager(Engineer): + pass pu_Employee = polymorphic_union( { - 'Manager': table_Employee.join( table_Engineer).join( table_Manager), - 'Engineer': select([table_Employee, table_Engineer.c.machine], table_Employee.c.atype == 'Engineer', from_obj=[table_Employee.join(table_Engineer)]), - 'Employee': table_Employee.select( table_Employee.c.atype == 'Employee'), + 'Manager': table_Employee.join( + table_Engineer).join( table_Manager), + 'Engineer': select([table_Employee, + table_Engineer.c.machine], + table_Employee.c.atype == 'Engineer', + from_obj=[ + table_Employee.join(table_Engineer)]), + 'Employee': table_Employee.select( + table_Employee.c.atype == 'Employee'), }, None, 'pu_employee', ) -# pu_Employee = polymorphic_union( { -# 'Manager': table_Employee.join( table_Engineer).join( table_Manager), -# 'Engineer': table_Employee.join(table_Engineer).select(table_Employee.c.atype == 'Engineer'), -# 'Employee': table_Employee.select( table_Employee.c.atype == 'Employee'), -# }, None, 'pu_employee', ) - mapper_Employee = mapper( Employee, table_Employee, polymorphic_identity= 'Employee', polymorphic_on= pu_Employee.c.atype, @@ -847,11 +998,18 @@ class MultiLevelTest(fixtures.MappedTest): ) pu_Engineer = polymorphic_union( { - 'Manager': table_Employee.join( table_Engineer).join( table_Manager), - 'Engineer': select([table_Employee, table_Engineer.c.machine], table_Employee.c.atype == 'Engineer', from_obj=[table_Employee.join(table_Engineer)]), + 'Manager': table_Employee.join( table_Engineer). + join( table_Manager), + 'Engineer': select([table_Employee, + table_Engineer.c.machine], + table_Employee.c.atype == 'Engineer', + from_obj=[ + table_Employee.join(table_Engineer) + ]), }, None, 'pu_engineer', ) mapper_Engineer = mapper( Engineer, table_Engineer, - inherit_condition= table_Engineer.c.id == table_Employee.c.id, + inherit_condition= table_Engineer.c.id == \ + table_Employee.c.id, inherits= mapper_Employee, polymorphic_identity= 'Engineer', polymorphic_on= pu_Engineer.c.atype, @@ -859,14 +1017,16 @@ class MultiLevelTest(fixtures.MappedTest): ) mapper_Manager = mapper( Manager, table_Manager, - inherit_condition= table_Manager.c.id == table_Engineer.c.id, + inherit_condition= table_Manager.c.id == \ + table_Engineer.c.id, inherits= mapper_Engineer, polymorphic_identity= 'Manager', ) a = Employee().set( name= 'one') b = Engineer().set( egn= 'two', machine= 'any') - c = Manager().set( name= 'head', machine= 'fast', duties= 'many') + c = Manager().set( name= 'head', machine= 'fast', + duties= 'many') session = create_session() session.add(a) @@ -880,16 +1040,19 @@ class MultiLevelTest(fixtures.MappedTest): class ManyToManyPolyTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): - global base_item_table, item_table, base_item_collection_table, collection_table + global base_item_table, item_table, base_item_collection_table, \ + collection_table base_item_table = Table( 'base_item', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('child_name', String(255), default=None)) item_table = Table( 'item', metadata, - Column('id', Integer, ForeignKey('base_item.id'), primary_key=True), - Column('dummy', Integer, default=0)) # Dummy column to avoid weird insert problems + Column('id', Integer, ForeignKey('base_item.id'), + primary_key=True), + Column('dummy', Integer, default=0)) base_item_collection_table = Table( 'base_item_collection', metadata, @@ -898,18 +1061,21 @@ class ManyToManyPolyTest(fixtures.MappedTest): collection_table = Table( 'collection', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('name', Unicode(255))) def test_pjoin_compile(self): - """test that remote_side columns in the secondary join table arent attempted to be - matched to the target polymorphic selectable""" + """test that remote_side columns in the secondary join table + arent attempted to be matched to the target polymorphic + selectable""" class BaseItem(object): pass class Item(BaseItem): pass class Collection(object): pass item_join = polymorphic_union( { - 'BaseItem':base_item_table.select(base_item_table.c.child_name=='BaseItem'), + 'BaseItem':base_item_table.select( + base_item_table.c.child_name=='BaseItem'), 'Item':base_item_table.join(item_table), }, None, 'item_join') @@ -918,7 +1084,9 @@ class ManyToManyPolyTest(fixtures.MappedTest): with_polymorphic=('*', item_join), polymorphic_on=base_item_table.c.child_name, polymorphic_identity='BaseItem', - properties=dict(collections=relationship(Collection, secondary=base_item_collection_table, backref="items"))) + properties=dict(collections=relationship(Collection, + secondary=base_item_collection_table, + backref="items"))) mapper( Item, item_table, @@ -934,7 +1102,8 @@ class CustomPKTest(fixtures.MappedTest): def define_tables(cls, metadata): global t1, t2 t1 = Table('t1', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('type', String(30), nullable=False), Column('data', String(30))) # note that the primary key column in t2 is named differently @@ -943,7 +1112,8 @@ class CustomPKTest(fixtures.MappedTest): Column('t2data', String(30))) def test_custompk(self): - """test that the primary_key attribute is propagated to the polymorphic mapper""" + """test that the primary_key attribute is propagated to the + polymorphic mapper""" class T1(object):pass class T2(T1):pass @@ -957,9 +1127,11 @@ class CustomPKTest(fixtures.MappedTest): d['t2'] = t1.join(t2) pjoin = polymorphic_union(d, None, 'pjoin') - mapper(T1, t1, polymorphic_on=t1.c.type, polymorphic_identity='t1', with_polymorphic=('*', pjoin), primary_key=[pjoin.c.id]) + mapper(T1, t1, polymorphic_on=t1.c.type, + polymorphic_identity='t1', + with_polymorphic=('*', pjoin), + primary_key=[pjoin.c.id]) mapper(T2, t2, inherits=T1, polymorphic_identity='t2') - print [str(c) for c in class_mapper(T1).primary_key] ot1 = T1() ot2 = T2() sess = create_session() @@ -968,7 +1140,8 @@ class CustomPKTest(fixtures.MappedTest): sess.flush() sess.expunge_all() - # query using get(), using only one value. this requires the select_table mapper + # query using get(), using only one value. + # this requires the select_table mapper # has the same single-col primary key. assert sess.query(T1).get(ot1.id).id == ot1.id @@ -977,8 +1150,8 @@ class CustomPKTest(fixtures.MappedTest): sess.flush() def test_pk_collapses(self): - """test that a composite primary key attribute formed by a join is "collapsed" into its - minimal columns""" + """test that a composite primary key attribute formed by a join + is "collapsed" into its minimal columns""" class T1(object):pass class T2(T1):pass @@ -992,11 +1165,12 @@ class CustomPKTest(fixtures.MappedTest): d['t2'] = t1.join(t2) pjoin = polymorphic_union(d, None, 'pjoin') - mapper(T1, t1, polymorphic_on=t1.c.type, polymorphic_identity='t1', with_polymorphic=('*', pjoin)) + mapper(T1, t1, polymorphic_on=t1.c.type, + polymorphic_identity='t1', + with_polymorphic=('*', pjoin)) mapper(T2, t2, inherits=T1, polymorphic_identity='t2') assert len(class_mapper(T1).primary_key) == 1 - print [str(c) for c in class_mapper(T1).primary_key] ot1 = T1() ot2 = T2() sess = create_session() @@ -1005,7 +1179,8 @@ class CustomPKTest(fixtures.MappedTest): sess.flush() sess.expunge_all() - # query using get(), using only one value. this requires the select_table mapper + # query using get(), using only one value. this requires the + # select_table mapper # has the same single-col primary key. assert sess.query(T1).get(ot1.id).id == ot1.id @@ -1019,27 +1194,33 @@ class InheritingEagerTest(fixtures.MappedTest): global people, employees, tags, peopleTags people = Table('people', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('_type', String(30), nullable=False), ) employees = Table('employees', metadata, - Column('id', Integer, ForeignKey('people.id'),primary_key=True), + Column('id', Integer, ForeignKey('people.id'), + primary_key=True), ) tags = Table('tags', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('label', String(50), nullable=False), ) peopleTags = Table('peopleTags', metadata, - Column('person_id', Integer,ForeignKey('people.id')), - Column('tag_id', Integer,ForeignKey('tags.id')), + Column('person_id', Integer, + ForeignKey('people.id')), + Column('tag_id', Integer, + ForeignKey('tags.id')), ) def test_basic(self): - """test that Query uses the full set of mapper._eager_loaders when generating SQL""" + """test that Query uses the full set of mapper._eager_loaders + when generating SQL""" class Person(fixtures.ComparableEntity): pass @@ -1052,10 +1233,14 @@ class InheritingEagerTest(fixtures.MappedTest): def __init__(self, label): self.label = label - mapper(Person, people, polymorphic_on=people.c._type,polymorphic_identity='person', properties={ - 'tags': relationship(Tag, secondary=peopleTags,backref='people', lazy='joined') + mapper(Person, people, polymorphic_on=people.c._type, + polymorphic_identity='person', properties={ + 'tags': relationship(Tag, + secondary=peopleTags, + backref='people', lazy='joined') }) - mapper(Employee, employees, inherits=Person,polymorphic_identity='employee') + mapper(Employee, employees, inherits=Person, + polymorphic_identity='employee') mapper(Tag, tags) session = create_session() @@ -1072,7 +1257,8 @@ class InheritingEagerTest(fixtures.MappedTest): session.expunge_all() # query from Employee with limit, query needs to apply eager limiting subquery - instance = session.query(Employee).filter_by(id=1).limit(1).first() + instance = session.query(Employee).\ + filter_by(id=1).limit(1).first() assert len(instance.tags) == 2 class MissingPolymorphicOnTest(fixtures.MappedTest): @@ -1240,8 +1426,10 @@ class JoinedInhAdjacencyTest(fixtures.MappedTest): self._roundtrip() def test_joined_subclass_to_superclass(self): - people, users, dudes = self.tables.people, self.tables.users, self.tables.dudes - Person, User, Dude = self.classes.Person, self.classes.User, self.classes.Dude + people, users, dudes = self.tables.people, self.tables.users, \ + self.tables.dudes + Person, User, Dude = self.classes.Person, self.classes.User, \ + self.classes.Dude mapper(Person, people, polymorphic_on=people.c.type, diff --git a/test/orm/inheritance/test_polymorph.py b/test/orm/inheritance/test_poly_persistence.py similarity index 60% rename from test/orm/inheritance/test_polymorph.py rename to test/orm/inheritance/test_poly_persistence.py index a9d2c2eb56..6939479b11 100644 --- a/test/orm/inheritance/test_polymorph.py +++ b/test/orm/inheritance/test_poly_persistence.py @@ -27,30 +27,36 @@ class PolymorphTest(fixtures.MappedTest): global companies, people, engineers, managers, boss companies = Table('companies', metadata, - Column('company_id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('company_id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('name', String(50))) people = Table('people', metadata, - Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('company_id', Integer, ForeignKey('companies.company_id'), nullable=False), + Column('person_id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('company_id', Integer, ForeignKey('companies.company_id'), + nullable=False), Column('name', String(50)), Column('type', String(30))) engineers = Table('engineers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), Column('status', String(30)), Column('engineer_name', String(50)), Column('primary_language', String(50)), ) managers = Table('managers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), Column('status', String(30)), Column('manager_name', String(50)) ) boss = Table('boss', metadata, - Column('boss_id', Integer, ForeignKey('managers.person_id'), primary_key=True), + Column('boss_id', Integer, ForeignKey('managers.person_id'), + primary_key=True), Column('golf_swing', String(30)), ) @@ -73,8 +79,10 @@ class InsertOrderTest(PolymorphTest): polymorphic_on=person_join.c.type, polymorphic_identity='person') - mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer') - mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager') + mapper(Engineer, engineers, inherits=person_mapper, + polymorphic_identity='engineer') + mapper(Manager, managers, inherits=person_mapper, + polymorphic_identity='manager') mapper(Company, companies, properties={ 'employees': relationship(Person, backref='company', @@ -102,12 +110,19 @@ class InsertOrderTest(PolymorphTest): class RoundTripTest(PolymorphTest): pass -def _generate_round_trip_test(include_base, lazy_relationship, redefine_colprop, with_polymorphic): +def _generate_round_trip_test(include_base, lazy_relationship, + redefine_colprop, with_polymorphic): """generates a round trip test. - - include_base - whether or not to include the base 'person' type in the union. - lazy_relationship - whether or not the Company relationship to People is lazy or eager. - redefine_colprop - if we redefine the 'name' column to be 'people_name' on the base Person class + + include_base - whether or not to include the base 'person' type in + the union. + + lazy_relationship - whether or not the Company relationship to + People is lazy or eager. + + redefine_colprop - if we redefine the 'name' column to be + 'people_name' on the base Person class + use_literal_join - primary join condition is explicitly specified """ def test_roundtrip(self): @@ -130,7 +145,8 @@ def _generate_round_trip_test(include_base, lazy_relationship, redefine_colprop, person_with_polymorphic = ['*', person_join] manager_with_polymorphic = ['*', manager_join] elif with_polymorphic == 'joins': - person_join = people.outerjoin(engineers).outerjoin(managers).outerjoin(boss) + person_join = people.outerjoin(engineers).outerjoin(managers).\ + outerjoin(boss) manager_join = people.join(managers).outerjoin(boss) person_with_polymorphic = ['*', person_join] manager_with_polymorphic = ['*', manager_join] @@ -143,18 +159,21 @@ def _generate_round_trip_test(include_base, lazy_relationship, redefine_colprop, if redefine_colprop: person_mapper = mapper(Person, people, - with_polymorphic=person_with_polymorphic, - polymorphic_on=people.c.type, - polymorphic_identity='person', - properties= {'person_name':people.c.name}) + with_polymorphic=person_with_polymorphic, + polymorphic_on=people.c.type, + polymorphic_identity='person', + properties= {'person_name':people.c.name}) else: person_mapper = mapper(Person, people, - with_polymorphic=person_with_polymorphic, - polymorphic_on=people.c.type, - polymorphic_identity='person') + with_polymorphic=person_with_polymorphic, + polymorphic_on=people.c.type, + polymorphic_identity='person') - mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer') - mapper(Manager, managers, inherits=person_mapper, with_polymorphic=manager_with_polymorphic, polymorphic_identity='manager') + mapper(Engineer, engineers, inherits=person_mapper, + polymorphic_identity='engineer') + mapper(Manager, managers, inherits=person_mapper, + with_polymorphic=manager_with_polymorphic, + polymorphic_identity='manager') mapper(Boss, boss, inherits=Manager, polymorphic_identity='boss') @@ -171,14 +190,20 @@ def _generate_round_trip_test(include_base, lazy_relationship, redefine_colprop, person_attribute_name = 'name' employees = [ - Manager(status='AAB', manager_name='manager1', **{person_attribute_name:'pointy haired boss'}), - Engineer(status='BBA', engineer_name='engineer1', primary_language='java', **{person_attribute_name:'dilbert'}), + Manager(status='AAB', manager_name='manager1', + **{person_attribute_name:'pointy haired boss'}), + Engineer(status='BBA', engineer_name='engineer1', + primary_language='java', + **{person_attribute_name:'dilbert'}), ] if include_base: employees.append(Person(**{person_attribute_name:'joesmith'})) employees += [ - Engineer(status='CGG', engineer_name='engineer2', primary_language='python', **{person_attribute_name:'wally'}), - Manager(status='ABA', manager_name='manager2', **{person_attribute_name:'jsmith'}) + Engineer(status='CGG', engineer_name='engineer2', + primary_language='python', + **{person_attribute_name:'wally'}), + Manager(status='ABA', manager_name='manager2', + **{person_attribute_name:'jsmith'}) ] pointy = employees[0] @@ -196,7 +221,9 @@ def _generate_round_trip_test(include_base, lazy_relationship, redefine_colprop, eq_(session.query(Person).get(dilbert.person_id), dilbert) session.expunge_all() - eq_(session.query(Person).filter(Person.person_id==dilbert.person_id).one(), dilbert) + eq_(session.query(Person).filter( + Person.person_id==dilbert.person_id).one(), + dilbert) session.expunge_all() def go(): @@ -215,56 +242,81 @@ def _generate_round_trip_test(include_base, lazy_relationship, redefine_colprop, else: self.assert_sql_count(testing.db, go, 6) - # test selecting from the query, using the base mapped table (people) as the selection criterion. - # in the case of the polymorphic Person query, the "people" selectable should be adapted to be "person_join" + # test selecting from the query, using the base + # mapped table (people) as the selection criterion. + # in the case of the polymorphic Person query, + # the "people" selectable should be adapted to be "person_join" eq_( - session.query(Person).filter(getattr(Person, person_attribute_name)=='dilbert').first(), + session.query(Person).filter( + getattr(Person, person_attribute_name)=='dilbert' + ).first(), dilbert ) - assert session.query(Person).filter(getattr(Person, person_attribute_name)=='dilbert').first().person_id + assert session.query(Person).filter( + getattr(Person, person_attribute_name)=='dilbert' + ).first().person_id eq_( - session.query(Engineer).filter(getattr(Person, person_attribute_name)=='dilbert').first(), + session.query(Engineer).filter( + getattr(Person, person_attribute_name)=='dilbert' + ).first(), dilbert ) - # test selecting from the query, joining against an alias of the base "people" table. test that - # the "palias" alias does *not* get sucked up into the "person_join" conversion. + # test selecting from the query, joining against + # an alias of the base "people" table. test that + # the "palias" alias does *not* get sucked up + # into the "person_join" conversion. palias = people.alias("palias") dilbert = session.query(Person).get(dilbert.person_id) - assert dilbert is session.query(Person).filter((palias.c.name=='dilbert') & (palias.c.person_id==Person.person_id)).first() - assert dilbert is session.query(Engineer).filter((palias.c.name=='dilbert') & (palias.c.person_id==Person.person_id)).first() - assert dilbert is session.query(Person).filter((Engineer.engineer_name=="engineer1") & (engineers.c.person_id==people.c.person_id)).first() - assert dilbert is session.query(Engineer).filter(Engineer.engineer_name=="engineer1")[0] - - dilbert.engineer_name = 'hes dibert!' + assert dilbert is session.query(Person).filter( + (palias.c.name=='dilbert') & \ + (palias.c.person_id==Person.person_id)).first() + assert dilbert is session.query(Engineer).filter( + (palias.c.name=='dilbert') & \ + (palias.c.person_id==Person.person_id)).first() + assert dilbert is session.query(Person).filter( + (Engineer.engineer_name=="engineer1") & \ + (engineers.c.person_id==people.c.person_id) + ).first() + assert dilbert is session.query(Engineer).\ + filter(Engineer.engineer_name=="engineer1")[0] session.flush() session.expunge_all() def go(): - session.query(Person).filter(getattr(Person, person_attribute_name)=='dilbert').first() + session.query(Person).filter(getattr(Person, + person_attribute_name)=='dilbert').first() self.assert_sql_count(testing.db, go, 1) session.expunge_all() - dilbert = session.query(Person).filter(getattr(Person, person_attribute_name)=='dilbert').first() + dilbert = session.query(Person).filter(getattr(Person, + person_attribute_name)=='dilbert').first() def go(): - # assert that only primary table is queried for already-present-in-session - d = session.query(Person).filter(getattr(Person, person_attribute_name)=='dilbert').first() + # assert that only primary table is queried for + # already-present-in-session + d = session.query(Person).filter(getattr(Person, + person_attribute_name)=='dilbert').first() self.assert_sql_count(testing.db, go, 1) # test standalone orphans - daboss = Boss(status='BBB', manager_name='boss', golf_swing='fore', **{person_attribute_name:'daboss'}) + daboss = Boss(status='BBB', + manager_name='boss', + golf_swing='fore', + **{person_attribute_name:'daboss'}) session.add(daboss) assert_raises(sa_exc.DBAPIError, session.flush) c = session.query(Company).first() daboss.company = c - manager_list = [e for e in c.employees if isinstance(e, Manager)] + manager_list = [e for e in c.employees + if isinstance(e, Manager)] session.flush() session.expunge_all() - eq_(session.query(Manager).order_by(Manager.person_id).all(), manager_list) + eq_(session.query(Manager).order_by(Manager.person_id).all(), + manager_list) c = session.query(Company).first() session.delete(c) @@ -285,7 +337,11 @@ for lazy_relationship in [True, False]: for with_polymorphic in ['unions', 'joins', 'auto', 'none']: if with_polymorphic == 'unions': for include_base in [True, False]: - _generate_round_trip_test(include_base, lazy_relationship, redefine_colprop, with_polymorphic) + _generate_round_trip_test(include_base, + lazy_relationship, + redefine_colprop, with_polymorphic) else: - _generate_round_trip_test(False, lazy_relationship, redefine_colprop, with_polymorphic) + _generate_round_trip_test(False, + lazy_relationship, + redefine_colprop, with_polymorphic) diff --git a/test/orm/inheritance/test_query.py b/test/orm/inheritance/test_polymorphic_rel.py similarity index 56% rename from test/orm/inheritance/test_query.py rename to test/orm/inheritance/test_polymorphic_rel.py index e9e9949e99..2e599a9dc3 100644 --- a/test/orm/inheritance/test_query.py +++ b/test/orm/inheritance/test_polymorphic_rel.py @@ -1,6 +1,8 @@ -from sqlalchemy import * -from sqlalchemy.orm import * -from sqlalchemy.orm import interfaces +from sqlalchemy import Integer, String, ForeignKey, func, desc, and_ +from sqlalchemy.orm import interfaces, relationship, mapper, \ + clear_mappers, create_session, joinedload, joinedload_all, \ + subqueryload, subqueryload_all, polymorphic_union, aliased,\ + class_mapper from sqlalchemy import exc as sa_exc from sqlalchemy.engine import default @@ -247,7 +249,7 @@ def _produce_test(select_type): count = {'':14, 'Polymorphic':9}.get(select_type, 10) self.assert_sql_count(testing.db, go, count) - def test_primary_eager_aliasing(self): + def test_primary_eager_aliasing_one(self): # For both joinedload() and subqueryload(), if the original q is # not loading the subclass table, the joinedload doesn't happen. @@ -259,6 +261,7 @@ def _produce_test(select_type): count = {'':6, 'Polymorphic':3}.get(select_type, 4) self.assert_sql_count(testing.db, go, count) + def test_primary_eager_aliasing_two(self): sess = create_session() def go(): eq_(sess.query(Person) @@ -267,6 +270,8 @@ def _produce_test(select_type): count = {'':14, 'Polymorphic':7}.get(select_type, 8) self.assert_sql_count(testing.db, go, count) + def test_primary_eager_aliasing_three(self): + # assert the JOINs don't over JOIN sess = create_session() @@ -282,7 +287,7 @@ def _produce_test(select_type): .subquery().count().scalar(), 2) - def test_get(self): + def test_get_one(self): """ For all mappers, ensure the primary key has been calculated as just the "person_id" column. @@ -290,8 +295,14 @@ def _produce_test(select_type): sess = create_session() eq_(sess.query(Person).get(e1.person_id), Engineer(name="dilbert", primary_language="java")) + + def test_get_two(self): + sess = create_session() eq_(sess.query(Engineer).get(e1.person_id), Engineer(name="dilbert", primary_language="java")) + + def test_get_three(self): + sess = create_session() eq_(sess.query(Manager).get(b1.person_id), Boss(name="pointy haired boss", golf_swing="fore")) @@ -328,84 +339,169 @@ def _produce_test(select_type): ) ]) - def test_filter_on_subclass(self): + def test_filter_on_subclass_one(self): sess = create_session() eq_(sess.query(Engineer).all()[0], Engineer(name="dilbert")) + + def test_filter_on_subclass_two(self): + sess = create_session() eq_(sess.query(Engineer).first(), Engineer(name="dilbert")) + + def test_filter_on_subclass_three(self): + sess = create_session() eq_(sess.query(Engineer) .filter(Engineer.person_id == e1.person_id).first(), Engineer(name="dilbert")) + + def test_filter_on_subclass_four(self): + sess = create_session() eq_(sess.query(Manager) .filter(Manager.person_id == m1.person_id).one(), Manager(name="dogbert")) + + def test_filter_on_subclass_five(self): + sess = create_session() eq_(sess.query(Manager) .filter(Manager.person_id == b1.person_id).one(), Boss(name="pointy haired boss")) + + def test_filter_on_subclass_six(self): + sess = create_session() eq_(sess.query(Boss) .filter(Boss.person_id == b1.person_id).one(), Boss(name="pointy haired boss")) - def test_join_from_polymorphic(self): + def test_join_from_polymorphic_nonaliased_one(self): sess = create_session() - for x in (True, False): - eq_(sess.query(Person) - .join('paperwork', aliased=x) - .filter(Paperwork.description.like('%review%')).all(), - [b1, m1]) - eq_(sess.query(Person) - .join('paperwork', aliased=x) - .filter(Paperwork.description.like('%#2%')).all(), - [e1, m1]) - eq_(sess.query(Engineer) - .join('paperwork', aliased=x) - .filter(Paperwork.description.like('%#2%')).all(), - [e1]) - eq_(sess.query(Person) - .join('paperwork', aliased=x) - .filter(Person.name.like('%dog%')) - .filter(Paperwork.description.like('%#2%')).all(), - [m1]) + eq_(sess.query(Person) + .join('paperwork', aliased=False) + .filter(Paperwork.description.like('%review%')).all(), + [b1, m1]) - def test_join_from_with_polymorphic(self): + def test_join_from_polymorphic_nonaliased_two(self): sess = create_session() - for aliased in (True, False): - sess.expunge_all() - eq_(sess.query(Person) - .with_polymorphic(Manager) - .join('paperwork', aliased=aliased) - .filter(Paperwork.description.like('%review%')).all(), - [b1, m1]) - sess.expunge_all() - eq_(sess.query(Person) - .with_polymorphic([Manager, Engineer]) - .join('paperwork', aliased=aliased) - .filter(Paperwork.description.like('%#2%')).all(), - [e1, m1]) - sess.expunge_all() - eq_(sess.query(Person) - .with_polymorphic([Manager, Engineer]) - .join('paperwork', aliased=aliased) - .filter(Person.name.like('%dog%')) - .filter(Paperwork.description.like('%#2%')).all(), - [m1]) + eq_(sess.query(Person) + .join('paperwork', aliased=False) + .filter(Paperwork.description.like('%#2%')).all(), + [e1, m1]) + + def test_join_from_polymorphic_nonaliased_three(self): + sess = create_session() + eq_(sess.query(Engineer) + .join('paperwork', aliased=False) + .filter(Paperwork.description.like('%#2%')).all(), + [e1]) + + def test_join_from_polymorphic_nonaliased_four(self): + sess = create_session() + eq_(sess.query(Person) + .join('paperwork', aliased=False) + .filter(Person.name.like('%dog%')) + .filter(Paperwork.description.like('%#2%')).all(), + [m1]) + + def test_join_from_polymorphic_aliased_one(self): + sess = create_session() + eq_(sess.query(Person) + .join('paperwork', aliased=True) + .filter(Paperwork.description.like('%review%')).all(), + [b1, m1]) + + def test_join_from_polymorphic_aliased_two(self): + sess = create_session() + eq_(sess.query(Person) + .join('paperwork', aliased=True) + .filter(Paperwork.description.like('%#2%')).all(), + [e1, m1]) + + def test_join_from_polymorphic_aliased_three(self): + sess = create_session() + eq_(sess.query(Engineer) + .join('paperwork', aliased=True) + .filter(Paperwork.description.like('%#2%')).all(), + [e1]) + + def test_join_from_polymorphic_aliased_four(self): + sess = create_session() + eq_(sess.query(Person) + .join('paperwork', aliased=True) + .filter(Person.name.like('%dog%')) + .filter(Paperwork.description.like('%#2%')).all(), + [m1]) + + def test_join_from_with_polymorphic_nonaliased_one(self): + sess = create_session() + eq_(sess.query(Person) + .with_polymorphic(Manager) + .join('paperwork') + .filter(Paperwork.description.like('%review%')).all(), + [b1, m1]) + + def test_join_from_with_polymorphic_nonaliased_two(self): + sess = create_session() + eq_(sess.query(Person) + .with_polymorphic([Manager, Engineer]) + .join('paperwork') + .filter(Paperwork.description.like('%#2%')).all(), + [e1, m1]) + + def test_join_from_with_polymorphic_nonaliased_three(self): + sess = create_session() + eq_(sess.query(Person) + .with_polymorphic([Manager, Engineer]) + .join('paperwork') + .filter(Person.name.like('%dog%')) + .filter(Paperwork.description.like('%#2%')).all(), + [m1]) + + + def test_join_from_with_polymorphic_aliased_one(self): + sess = create_session() + eq_(sess.query(Person) + .with_polymorphic(Manager) + .join('paperwork', aliased=True) + .filter(Paperwork.description.like('%review%')).all(), + [b1, m1]) - def test_join_to_polymorphic(self): + def test_join_from_with_polymorphic_aliased_two(self): + sess = create_session() + eq_(sess.query(Person) + .with_polymorphic([Manager, Engineer]) + .join('paperwork', aliased=True) + .filter(Paperwork.description.like('%#2%')).all(), + [e1, m1]) + + def test_join_from_with_polymorphic_aliased_three(self): + sess = create_session() + eq_(sess.query(Person) + .with_polymorphic([Manager, Engineer]) + .join('paperwork', aliased=True) + .filter(Person.name.like('%dog%')) + .filter(Paperwork.description.like('%#2%')).all(), + [m1]) + + def test_join_to_polymorphic_nonaliased(self): sess = create_session() eq_(sess.query(Company) .join('employees') .filter(Person.name == 'vlad').one(), c2) + + def test_join_to_polymorphic_aliased(self): + sess = create_session() eq_(sess.query(Company) .join('employees', aliased=True) .filter(Person.name == 'vlad').one(), c2) - def test_polymorphic_any(self): + def test_polymorphic_any_one(self): sess = create_session() any_ = Company.employees.any(Person.name == 'vlad') eq_(sess.query(Company).filter(any_).all(), [c2]) + def test_polymorphic_any_two(self): + sess = create_session() # test that the aliasing on "Person" does not bleed into the # EXISTS clause generated by any() any_ = Company.employees.any(Person.name == 'wally') @@ -415,6 +511,8 @@ def _produce_test(select_type): .filter(any_).all(), [c1]) + def test_polymorphic_any_three(self): + sess = create_session() any_ = Company.employees.any(Person.name == 'vlad') eq_(sess.query(Company) .join(Company.employees, aliased=True) @@ -422,37 +520,51 @@ def _produce_test(select_type): .filter(any_).all(), []) + def test_polymorphic_any_four(self): + sess = create_session() any_ = Company.employees.of_type(Engineer).any( Engineer.primary_language == 'cobol') eq_(sess.query(Company).filter(any_).one(), c2) + def test_polymorphic_any_five(self): + sess = create_session() calias = aliased(Company) any_ = calias.employees.of_type(Engineer).any( Engineer.primary_language == 'cobol') eq_(sess.query(calias).filter(any_).one(), c2) + def test_polymorphic_any_six(self): + sess = create_session() any_ = Company.employees.of_type(Boss).any( Boss.golf_swing == 'fore') eq_(sess.query(Company).filter(any_).one(), c1) + def test_polymorphic_any_seven(self): + sess = create_session() any_ = Company.employees.of_type(Boss).any( Manager.manager_name == 'pointy') eq_(sess.query(Company).filter(any_).one(), c1) + def test_polymorphic_any_eight(self): + sess = create_session() if select_type != '': any_ = Engineer.machines.any( Machine.name == "Commodore 64") eq_(sess.query(Person).filter(any_).all(), [e2, e3]) + def test_polymorphic_any_nine(self): + sess = create_session() any_ = Person.paperwork.any( Paperwork.description == "review #2") eq_(sess.query(Person).filter(any_).all(), [m1]) + def test_polymorphic_any_ten(self): + sess = create_session() any_ = Company.employees.of_type(Engineer).any( and_(Engineer.primary_language == 'cobol')) eq_(sess.query(Company).filter(any_).one(), c2) - def test_join_from_columns_or_subclass(self): + def test_join_from_columns_or_subclass_one(self): sess = create_session() expected = [ @@ -462,6 +574,8 @@ def _produce_test(select_type): .order_by(Manager.name).all(), expected) + def test_join_from_columns_or_subclass_two(self): + sess = create_session() expected = [ (u'dogbert',), (u'dogbert',), @@ -471,6 +585,8 @@ def _produce_test(select_type): .order_by(Manager.name).all(), expected) + def test_join_from_columns_or_subclass_three(self): + sess = create_session() expected = [ (u'dilbert',), (u'dilbert',), @@ -485,6 +601,8 @@ def _produce_test(select_type): .order_by(Person.name).all(), expected) + def test_join_from_columns_or_subclass_four(self): + sess = create_session() # Load Person.name, joining from Person -> paperwork, get all # the people. expected = [ @@ -502,6 +620,8 @@ def _produce_test(select_type): .order_by(Person.name).all(), expected) + def test_join_from_columns_or_subclass_five(self): + sess = create_session() # same, on manager. get only managers. expected = [ (u'dogbert',), @@ -513,6 +633,8 @@ def _produce_test(select_type): .order_by(Person.name).all(), expected) + def test_join_from_columns_or_subclass_six(self): + sess = create_session() if select_type == '': # this now raises, due to [ticket:1892]. Manager.person_id # is now the "person_id" column on Manager. SQL is incorrect. @@ -553,11 +675,15 @@ def _produce_test(select_type): .order_by(Person.name).all(), expected) + def test_join_from_columns_or_subclass_seven(self): + sess = create_session() eq_(sess.query(Manager) .join(Paperwork, Manager.paperwork) .order_by(Manager.name).all(), [m1, b1]) + def test_join_from_columns_or_subclass_eight(self): + sess = create_session() expected = [ (u'dogbert',), (u'dogbert',), @@ -568,12 +694,16 @@ def _produce_test(select_type): .order_by(Manager.name).all(), expected) + def test_join_from_columns_or_subclass_nine(self): + sess = create_session() eq_(sess.query(Manager.person_id) .join(paperwork, Manager.person_id == paperwork.c.person_id) .order_by(Manager.name).all(), [(4,), (4,), (3,)]) + def test_join_from_columns_or_subclass_ten(self): + sess = create_session() expected = [ (u'pointy haired boss', u'review #1'), (u'dogbert', u'review #2'), @@ -584,6 +714,8 @@ def _produce_test(select_type): .order_by(Paperwork.paperwork_id).all(), expected) + def test_join_from_columns_or_subclass_eleven(self): + sess = create_session() expected = [ (u'pointy haired boss',), (u'dogbert',), @@ -640,19 +772,8 @@ def _produce_test(select_type): sess.expire(m2, ['manager_name', 'golf_swing']) assert m2.golf_swing == 'fore' - def test_with_polymorphic(self): - sess = create_session() - - assert_raises(sa_exc.InvalidRequestError, - sess.query(Person).with_polymorphic, Paperwork) - assert_raises(sa_exc.InvalidRequestError, - sess.query(Engineer).with_polymorphic, Boss) - assert_raises(sa_exc.InvalidRequestError, - sess.query(Engineer).with_polymorphic, Person) - - # compare to entities without related collections to prevent - # additional lazy SQL from firing on loaded entities - emps_without_relationships = [ + def _emps_wo_relationships_fixture(self): + return [ Engineer( name="dilbert", engineer_name="dilbert", @@ -678,49 +799,71 @@ def _produce_test(select_type): primary_language="cobol", status="elbonian engineer") ] - eq_(sess.query(Person).with_polymorphic('*').all(), - emps_without_relationships) + def test_with_polymorphic_one(self): + sess = create_session() def go(): eq_(sess.query(Person) .with_polymorphic(Engineer) .filter(Engineer.primary_language == 'java').all(), - emps_without_relationships[0:1]) + self._emps_wo_relationships_fixture()[0:1]) self.assert_sql_count(testing.db, go, 1) - sess.expunge_all() + def test_with_polymorphic_two(self): + sess = create_session() def go(): eq_(sess.query(Person) .with_polymorphic('*').all(), - emps_without_relationships) + self._emps_wo_relationships_fixture()) self.assert_sql_count(testing.db, go, 1) - sess.expunge_all() + def test_with_polymorphic_three(self): + sess = create_session() def go(): eq_(sess.query(Person) .with_polymorphic(Engineer).all(), - emps_without_relationships) + self._emps_wo_relationships_fixture()) self.assert_sql_count(testing.db, go, 3) - sess.expunge_all() + def test_with_polymorphic_four(self): + sess = create_session() def go(): eq_(sess.query(Person) .with_polymorphic( Engineer, people.outerjoin(engineers)) .all(), - emps_without_relationships) + self._emps_wo_relationships_fixture()) self.assert_sql_count(testing.db, go, 3) - sess.expunge_all() + def test_with_polymorphic_five(self): + sess = create_session() def go(): # limit the polymorphic join down to just "Person", # overriding select_table eq_(sess.query(Person) .with_polymorphic(Person).all(), - emps_without_relationships) + self._emps_wo_relationships_fixture()) self.assert_sql_count(testing.db, go, 6) + def test_with_polymorphic_six(self): + sess = create_session() + + assert_raises(sa_exc.InvalidRequestError, + sess.query(Person).with_polymorphic, Paperwork) + assert_raises(sa_exc.InvalidRequestError, + sess.query(Engineer).with_polymorphic, Boss) + assert_raises(sa_exc.InvalidRequestError, + sess.query(Engineer).with_polymorphic, Person) + + def test_with_polymorphic_seven(self): + sess = create_session() + # compare to entities without related collections to prevent + # additional lazy SQL from firing on loaded entities + eq_(sess.query(Person).with_polymorphic('*').all(), + self._emps_wo_relationships_fixture()) + + def test_relationship_to_polymorphic(self): expected = [ Company( @@ -846,89 +989,138 @@ def _produce_test(select_type): .filter(Engineer.primary_language == 'java').all(), [c1]) - if select_type == '': - eq_(sess.query(Company) - .select_from(companies.join(people).join(engineers)) - .filter(Engineer.primary_language == 'java').all(), - [c1]) + def test_join_to_subclass_one(self): + sess = create_session() + eq_(sess.query(Company) + .select_from(companies.join(people).join(engineers)) + .filter(Engineer.primary_language == 'java').all(), + [c1]) - eq_(sess.query(Company) - .join(people.join(engineers), 'employees') - .filter(Engineer.primary_language == 'java').all(), - [c1]) + def test_join_to_subclass_two(self): + sess = create_session() + eq_(sess.query(Company) + .join(people.join(engineers), 'employees') + .filter(Engineer.primary_language == 'java').all(), + [c1]) - ealias = aliased(Engineer) - eq_(sess.query(Company) - .join(ealias, 'employees') - .filter(ealias.primary_language == 'java').all(), - [c1]) + def test_join_to_subclass_three(self): + sess = create_session() + ealias = aliased(Engineer) + eq_(sess.query(Company) + .join(ealias, 'employees') + .filter(ealias.primary_language == 'java').all(), + [c1]) + if select_type == '': + def test_join_to_subclass_four(self): + sess = create_session() eq_(sess.query(Person) .select_from(people.join(engineers)) .join(Engineer.machines).all(), [e1, e2, e3]) + def test_join_to_subclass_five(self): + sess = create_session() eq_(sess.query(Person) .select_from(people.join(engineers)) .join(Engineer.machines) .filter(Machine.name.ilike("%ibm%")).all(), [e1, e3]) - eq_(sess.query(Company) - .join(people.join(engineers), 'employees') - .join(Engineer.machines).all(), - [c1, c2]) + def test_join_to_subclass_six(self): + sess = create_session() + eq_(sess.query(Company) + .join(people.join(engineers), 'employees') + .join(Engineer.machines).all(), + [c1, c2]) - eq_(sess.query(Company) - .join(people.join(engineers), 'employees') - .join(Engineer.machines) - .filter(Machine.name.ilike("%thinkpad%")).all(), - [c1]) - else: - eq_(sess.query(Company) - .select_from(companies.join(people).join(engineers)) - .filter(Engineer.primary_language == 'java').all(), - [c1]) + def test_join_to_subclass_seven(self): + sess = create_session() + eq_(sess.query(Company) + .join(people.join(engineers), 'employees') + .join(Engineer.machines) + .filter(Machine.name.ilike("%thinkpad%")).all(), + [c1]) + + + def test_join_to_subclass_eight(self): + sess = create_session() + eq_(sess.query(Person) + .join(Engineer.machines).all(), + [e1, e2, e3]) + def test_join_to_subclass_nine(self): + sess = create_session() + eq_(sess.query(Company) + .select_from(companies.join(people).join(engineers)) + .filter(Engineer.primary_language == 'java').all(), + [c1]) + + if select_type != '': + def test_join_to_subclass_ten(self): + sess = create_session() eq_(sess.query(Company) .join('employees') .filter(Engineer.primary_language == 'java').all(), [c1]) - eq_(sess.query(Person) - .join(Engineer.machines).all(), - [e1, e2, e3]) + def test_join_to_subclass_eleven(self): + sess = create_session() + eq_(sess.query(Company) + .select_from(companies.join(people).join(engineers)) + .filter(Engineer.primary_language == 'java').all(), + [c1]) - eq_(sess.query(Person) - .join(Engineer.machines) - .filter(Machine.name.ilike("%ibm%")).all(), - [e1, e3]) + def test_join_to_subclass_twelve(self): + sess = create_session() + eq_(sess.query(Person) + .join(Engineer.machines).all(), + [e1, e2, e3]) - eq_(sess.query(Company) - .join('employees', Engineer.machines).all(), - [c1, c2]) + def test_join_to_subclass_thirteen(self): + sess = create_session() + eq_(sess.query(Person) + .join(Engineer.machines) + .filter(Machine.name.ilike("%ibm%")).all(), + [e1, e3]) - eq_(sess.query(Company) - .join('employees', Engineer.machines) - .filter(Machine.name.ilike("%thinkpad%")).all(), - [c1]) + def test_join_to_subclass_fourteen(self): + sess = create_session() + eq_(sess.query(Company) + .join('employees', Engineer.machines).all(), + [c1, c2]) + def test_join_to_subclass_fifteen(self): + sess = create_session() + eq_(sess.query(Company) + .join('employees', Engineer.machines) + .filter(Machine.name.ilike("%thinkpad%")).all(), + [c1]) + + def test_join_to_subclass_sixteen(self): + sess = create_session() # non-polymorphic eq_(sess.query(Engineer) .join(Engineer.machines).all(), [e1, e2, e3]) + def test_join_to_subclass_seventeen(self): + sess = create_session() eq_(sess.query(Engineer) .join(Engineer.machines) .filter(Machine.name.ilike("%ibm%")).all(), [e1, e3]) + def test_join_to_subclass_eightteen(self): + sess = create_session() # here's the new way eq_(sess.query(Company) .join(Company.employees.of_type(Engineer)) .filter(Engineer.primary_language == 'java').all(), [c1]) + def test_join_to_subclass_nineteen(self): + sess = create_session() eq_(sess.query(Company) .join(Company.employees.of_type(Engineer), 'machines') .filter(Machine.name.ilike("%thinkpad%")).all(), @@ -953,44 +1145,101 @@ def _produce_test(select_type): .filter(Engineer.primary_language == 'java').count(), 1) - def test_join_through_polymorphic(self): + def test_join_through_polymorphic_nonaliased_one(self): sess = create_session() - for x in (True, False): - eq_(sess.query(Company) - .join('employees', 'paperwork', aliased=x) - .filter(Paperwork.description.like('%#2%')).all(), - [c1]) + eq_(sess.query(Company) + .join('employees', 'paperwork', aliased=False) + .filter(Paperwork.description.like('%#2%')).all(), + [c1]) - eq_(sess.query(Company) - .join('employees', 'paperwork', aliased=x) - .filter(Paperwork.description.like('%#%')).all(), - [c1, c2]) + def test_join_through_polymorphic_nonaliased_two(self): + sess = create_session() + eq_(sess.query(Company) + .join('employees', 'paperwork', aliased=False) + .filter(Paperwork.description.like('%#%')).all(), + [c1, c2]) - eq_(sess.query(Company) - .join('employees', 'paperwork', aliased=x) - .filter(Person.name.in_(['dilbert', 'vlad'])) - .filter(Paperwork.description.like('%#2%')).all(), - [c1]) + def test_join_through_polymorphic_nonaliased_three(self): + sess = create_session() + eq_(sess.query(Company) + .join('employees', 'paperwork', aliased=False) + .filter(Person.name.in_(['dilbert', 'vlad'])) + .filter(Paperwork.description.like('%#2%')).all(), + [c1]) - eq_(sess.query(Company) - .join('employees', 'paperwork', aliased=x) - .filter(Person.name.in_(['dilbert', 'vlad'])) - .filter(Paperwork.description.like('%#%')).all(), - [c1, c2]) + def test_join_through_polymorphic_nonaliased_four(self): + sess = create_session() + eq_(sess.query(Company) + .join('employees', 'paperwork', aliased=False) + .filter(Person.name.in_(['dilbert', 'vlad'])) + .filter(Paperwork.description.like('%#%')).all(), + [c1, c2]) - eq_(sess.query(Company) - .join('employees', aliased=aliased) - .filter(Person.name.in_(['dilbert', 'vlad'])) - .join('paperwork', from_joinpoint=True, aliased=x) - .filter(Paperwork.description.like('%#2%')).all(), - [c1]) + def test_join_through_polymorphic_nonaliased_five(self): + sess = create_session() + eq_(sess.query(Company) + .join('employees', aliased=aliased) + .filter(Person.name.in_(['dilbert', 'vlad'])) + .join('paperwork', from_joinpoint=True, aliased=False) + .filter(Paperwork.description.like('%#2%')).all(), + [c1]) - eq_(sess.query(Company) - .join('employees', aliased=aliased) - .filter(Person.name.in_(['dilbert', 'vlad'])) - .join('paperwork', from_joinpoint=True, aliased=x) - .filter(Paperwork.description.like('%#%')).all(), - [c1, c2]) + def test_join_through_polymorphic_nonaliased_six(self): + sess = create_session() + eq_(sess.query(Company) + .join('employees', aliased=aliased) + .filter(Person.name.in_(['dilbert', 'vlad'])) + .join('paperwork', from_joinpoint=True, aliased=False) + .filter(Paperwork.description.like('%#%')).all(), + [c1, c2]) + + def test_join_through_polymorphic_aliased_one(self): + sess = create_session() + eq_(sess.query(Company) + .join('employees', 'paperwork', aliased=True) + .filter(Paperwork.description.like('%#2%')).all(), + [c1]) + + def test_join_through_polymorphic_aliased_two(self): + sess = create_session() + eq_(sess.query(Company) + .join('employees', 'paperwork', aliased=True) + .filter(Paperwork.description.like('%#%')).all(), + [c1, c2]) + + def test_join_through_polymorphic_aliased_three(self): + sess = create_session() + eq_(sess.query(Company) + .join('employees', 'paperwork', aliased=True) + .filter(Person.name.in_(['dilbert', 'vlad'])) + .filter(Paperwork.description.like('%#2%')).all(), + [c1]) + + def test_join_through_polymorphic_aliased_four(self): + sess = create_session() + eq_(sess.query(Company) + .join('employees', 'paperwork', aliased=True) + .filter(Person.name.in_(['dilbert', 'vlad'])) + .filter(Paperwork.description.like('%#%')).all(), + [c1, c2]) + + def test_join_through_polymorphic_aliased_five(self): + sess = create_session() + eq_(sess.query(Company) + .join('employees', aliased=aliased) + .filter(Person.name.in_(['dilbert', 'vlad'])) + .join('paperwork', from_joinpoint=True, aliased=True) + .filter(Paperwork.description.like('%#2%')).all(), + [c1]) + + def test_join_through_polymorphic_aliased_six(self): + sess = create_session() + eq_(sess.query(Company) + .join('employees', aliased=aliased) + .filter(Person.name.in_(['dilbert', 'vlad'])) + .join('paperwork', from_joinpoint=True, aliased=True) + .filter(Paperwork.description.like('%#%')).all(), + [c1, c2]) def test_explicit_polymorphic_join(self): sess = create_session() @@ -1061,81 +1310,12 @@ def _produce_test(select_type): .filter(Person.person_id.in_(subq)).one(), e1) - def test_mixed_entities(self): - sess = create_session() - - expected = [( - u'Elbonia, Inc.', - Engineer( - status=u'elbonian engineer', - engineer_name=u'vlad', - name=u'vlad', - primary_language=u'cobol'))] - eq_(sess.query(Company.name, Person) - .join(Company.employees) - .filter(Company.name == 'Elbonia, Inc.').all(), - expected) - - expected = [( - Engineer( - status=u'elbonian engineer', - engineer_name=u'vlad', - name=u'vlad', - primary_language=u'cobol'), - u'Elbonia, Inc.')] - eq_(sess.query(Person, Company.name) - .join(Company.employees) - .filter(Company.name == 'Elbonia, Inc.').all(), - expected) - - expected = [('pointy haired boss',), ('dogbert',)] - eq_(sess.query(Manager.name).all(), expected) - - expected = [('pointy haired boss foo',), ('dogbert foo',)] - eq_(sess.query(Manager.name + " foo").all(), expected) - - row = sess.query(Engineer.name, Engineer.primary_language) \ - .filter(Engineer.name == 'dilbert').first() - assert row.name == 'dilbert' - assert row.primary_language == 'java' - - expected = [ - (u'dilbert', u'java'), - (u'wally', u'c++'), - (u'vlad', u'cobol')] - eq_(sess.query(Engineer.name, Engineer.primary_language).all(), - expected) - - expected = [(u'pointy haired boss', u'fore')] - eq_(sess.query(Boss.name, Boss.golf_swing).all(), expected) - - # TODO: I think raise error on these for now. different - # inheritance/loading schemes have different results here, - # all incorrect - # - # eq_( - # sess.query(Person.name, Engineer.primary_language).all(), - # []) - # eq_(sess.query( - # Person.name, - # Engineer.primary_language, - # Manager.manager_name) - # .all(), - # []) - expected = [(u'vlad', u'Elbonia, Inc.')] - eq_(sess.query(Person.name, Company.name) - .join(Company.employees) - .filter(Company.name == 'Elbonia, Inc.').all(), - expected) + if select_type != '': + def test_mixed_entities_one(self): + sess = create_session() - expected = [(u'java',), (u'c++',), (u'cobol',)] - eq_(sess.query(Engineer.primary_language) - .filter(Person.type == 'engineer').all(), - expected) - - if select_type != '': expected = [ (Engineer( status=u'regular engineer', @@ -1169,6 +1349,9 @@ def _produce_test(select_type): .filter(Person.type == 'engineer').all(), expected) + if select_type != '': + def test_mixed_entities_two(self): + sess = create_session() expected = [ (u'java', u'MegaCorp, Inc.'), (u'cobol', u'Elbonia, Inc.'), @@ -1179,6 +1362,8 @@ def _produce_test(select_type): .order_by(desc(Engineer.primary_language)).all(), expected) + def test_mixed_entities_three(self): + sess = create_session() palias = aliased(Person) expected = [( Engineer( @@ -1201,6 +1386,9 @@ def _produce_test(select_type): .filter(palias.name == 'dilbert').all(), expected) + def test_mixed_entities_four(self): + sess = create_session() + palias = aliased(Person) expected = [( Engineer( status=u'regular engineer', @@ -1222,6 +1410,9 @@ def _produce_test(select_type): .filter(palias.name == 'dilbert').all(), expected) + def test_mixed_entities_five(self): + sess = create_session() + palias = aliased(Person) expected = [(u'vlad', u'Elbonia, Inc.', u'dilbert')] eq_(sess.query(Person.name, Company.name, palias.name) .join(Company.employees) @@ -1229,6 +1420,8 @@ def _produce_test(select_type): .filter(palias.name == 'dilbert').all(), expected) + def test_mixed_entities_six(self): + sess = create_session() palias = aliased(Person) expected = [ (u'manager', u'dogbert', u'engineer', u'dilbert'), @@ -1241,6 +1434,8 @@ def _produce_test(select_type): .order_by(Person.person_id, palias.person_id).all(), expected) + def test_mixed_entities_seven(self): + sess = create_session() expected = [ (u'dilbert', u'tps report #1'), (u'dilbert', u'tps report #2'), @@ -1255,11 +1450,15 @@ def _produce_test(select_type): .order_by(Person.name, Paperwork.description).all(), expected) - if select_type != '': + if select_type != '': + def test_mixed_entities_eight(self): + sess = create_session() eq_(sess.query(func.count(Person.person_id)) .filter(Engineer.primary_language == 'java').all(), [(1,)]) + def test_mixed_entities_nine(self): + sess = create_session() expected = [(u'Elbonia, Inc.', 1), (u'MegaCorp, Inc.', 4)] eq_(sess.query(Company.name, func.count(Person.person_id)) .filter(Company.company_id == Person.company_id) @@ -1267,6 +1466,8 @@ def _produce_test(select_type): .order_by(Company.name).all(), expected) + def test_mixed_entities_ten(self): + sess = create_session() expected = [(u'Elbonia, Inc.', 1), (u'MegaCorp, Inc.', 4)] eq_(sess.query(Company.name, func.count(Person.person_id)) .join(Company.employees) @@ -1274,6 +1475,100 @@ def _produce_test(select_type): .order_by(Company.name).all(), expected) + #def test_mixed_entities(self): + # sess = create_session() + # TODO: I think raise error on these for now. different + # inheritance/loading schemes have different results here, + # all incorrect + # + # eq_( + # sess.query(Person.name, Engineer.primary_language).all(), + # []) + + #def test_mixed_entities(self): + # sess = create_session() + # eq_(sess.query( + # Person.name, + # Engineer.primary_language, + # Manager.manager_name) + # .all(), + # []) + + def test_mixed_entities_eleven(self): + sess = create_session() + expected = [(u'java',), (u'c++',), (u'cobol',)] + eq_(sess.query(Engineer.primary_language) + .filter(Person.type == 'engineer').all(), + expected) + + def test_mixed_entities_twelve(self): + sess = create_session() + expected = [(u'vlad', u'Elbonia, Inc.')] + eq_(sess.query(Person.name, Company.name) + .join(Company.employees) + .filter(Company.name == 'Elbonia, Inc.').all(), + expected) + + def test_mixed_entities_thirteen(self): + sess = create_session() + expected = [(u'pointy haired boss', u'fore')] + eq_(sess.query(Boss.name, Boss.golf_swing).all(), expected) + + def test_mixed_entities_fourteen(self): + sess = create_session() + expected = [ + (u'dilbert', u'java'), + (u'wally', u'c++'), + (u'vlad', u'cobol')] + eq_(sess.query(Engineer.name, Engineer.primary_language).all(), + expected) + + def test_mixed_entities_fifteen(self): + sess = create_session() + + expected = [( + u'Elbonia, Inc.', + Engineer( + status=u'elbonian engineer', + engineer_name=u'vlad', + name=u'vlad', + primary_language=u'cobol'))] + eq_(sess.query(Company.name, Person) + .join(Company.employees) + .filter(Company.name == 'Elbonia, Inc.').all(), + expected) + + def test_mixed_entities_sixteen(self): + sess = create_session() + expected = [( + Engineer( + status=u'elbonian engineer', + engineer_name=u'vlad', + name=u'vlad', + primary_language=u'cobol'), + u'Elbonia, Inc.')] + eq_(sess.query(Person, Company.name) + .join(Company.employees) + .filter(Company.name == 'Elbonia, Inc.').all(), + expected) + + def test_mixed_entities_seventeen(self): + sess = create_session() + expected = [('pointy haired boss',), ('dogbert',)] + eq_(sess.query(Manager.name).all(), expected) + + def test_mixed_entities_eighteen(self): + sess = create_session() + expected = [('pointy haired boss foo',), ('dogbert foo',)] + eq_(sess.query(Manager.name + " foo").all(), expected) + + def test_mixed_entities_nineteen(self): + sess = create_session() + row = sess.query(Engineer.name, Engineer.primary_language) \ + .filter(Engineer.name == 'dilbert').first() + assert row.name == 'dilbert' + assert row.primary_language == 'java' + PolymorphicQueryTest.__name__ = "Polymorphic%sTest" % select_type return PolymorphicQueryTest @@ -1282,855 +1577,3 @@ for select_type in ('', 'Polymorphic', 'Unions', 'AliasedJoins', 'Joins'): exec("%s = testclass" % testclass.__name__) del testclass - -class SelfReferentialTestJoinedToBase(fixtures.MappedTest): - - run_setup_mappers = 'once' - - @classmethod - def define_tables(cls, metadata): - Table('people', metadata, - Column('person_id', Integer, - primary_key=True, - test_needs_autoincrement=True), - Column('name', String(50)), - Column('type', String(30))) - - Table('engineers', metadata, - Column('person_id', Integer, - ForeignKey('people.person_id'), - primary_key=True), - Column('primary_language', String(50)), - Column('reports_to_id', Integer, - ForeignKey('people.person_id'))) - - @classmethod - def setup_mappers(cls): - engineers, people = cls.tables.engineers, cls.tables.people - - mapper(Person, people, - polymorphic_on=people.c.type, - polymorphic_identity='person') - - mapper(Engineer, engineers, - inherits=Person, - inherit_condition=engineers.c.person_id == people.c.person_id, - polymorphic_identity='engineer', - properties={ - 'reports_to':relationship( - Person, - primaryjoin= - people.c.person_id == engineers.c.reports_to_id)}) - - def test_has(self): - p1 = Person(name='dogbert') - e1 = Engineer(name='dilbert', primary_language='java', reports_to=p1) - sess = create_session() - sess.add(p1) - sess.add(e1) - sess.flush() - sess.expunge_all() - eq_(sess.query(Engineer) - .filter(Engineer.reports_to.has(Person.name == 'dogbert')) - .first(), - Engineer(name='dilbert')) - - def test_oftype_aliases_in_exists(self): - e1 = Engineer(name='dilbert', primary_language='java') - e2 = Engineer(name='wally', primary_language='c++', reports_to=e1) - sess = create_session() - sess.add_all([e1, e2]) - sess.flush() - eq_(sess.query(Engineer) - .filter(Engineer.reports_to - .of_type(Engineer) - .has(Engineer.name == 'dilbert')) - .first(), - e2) - - def test_join(self): - p1 = Person(name='dogbert') - e1 = Engineer(name='dilbert', primary_language='java', reports_to=p1) - sess = create_session() - sess.add(p1) - sess.add(e1) - sess.flush() - sess.expunge_all() - eq_(sess.query(Engineer) - .join('reports_to', aliased=True) - .filter(Person.name == 'dogbert').first(), - Engineer(name='dilbert')) - -class SelfReferentialJ2JTest(fixtures.MappedTest): - - run_setup_mappers = 'once' - - @classmethod - def define_tables(cls, metadata): - people = Table('people', metadata, - Column('person_id', Integer, - primary_key=True, - test_needs_autoincrement=True), - Column('name', String(50)), - Column('type', String(30))) - - engineers = Table('engineers', metadata, - Column('person_id', Integer, - ForeignKey('people.person_id'), - primary_key=True), - Column('primary_language', String(50)), - Column('reports_to_id', Integer, - ForeignKey('managers.person_id')) - ) - - managers = Table('managers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), - primary_key=True), - ) - - @classmethod - def setup_mappers(cls): - engineers = cls.tables.engineers - managers = cls.tables.managers - people = cls.tables.people - - mapper(Person, people, - polymorphic_on=people.c.type, - polymorphic_identity='person') - - mapper(Manager, managers, - inherits=Person, - polymorphic_identity='manager') - - mapper(Engineer, engineers, - inherits=Person, - polymorphic_identity='engineer', - properties={ - 'reports_to':relationship( - Manager, - primaryjoin= - managers.c.person_id == engineers.c.reports_to_id, - backref='engineers')}) - - def test_has(self): - m1 = Manager(name='dogbert') - e1 = Engineer(name='dilbert', primary_language='java', reports_to=m1) - sess = create_session() - sess.add(m1) - sess.add(e1) - sess.flush() - sess.expunge_all() - - eq_(sess.query(Engineer) - .filter(Engineer.reports_to.has(Manager.name == 'dogbert')) - .first(), - Engineer(name='dilbert')) - - def test_join(self): - m1 = Manager(name='dogbert') - e1 = Engineer(name='dilbert', primary_language='java', reports_to=m1) - sess = create_session() - sess.add(m1) - sess.add(e1) - sess.flush() - sess.expunge_all() - - eq_(sess.query(Engineer) - .join('reports_to', aliased=True) - .filter(Manager.name == 'dogbert').first(), - Engineer(name='dilbert')) - - def test_filter_aliasing(self): - m1 = Manager(name='dogbert') - m2 = Manager(name='foo') - e1 = Engineer(name='wally', primary_language='java', reports_to=m1) - e2 = Engineer(name='dilbert', primary_language='c++', reports_to=m2) - e3 = Engineer(name='etc', primary_language='c++') - - sess = create_session() - sess.add_all([m1, m2, e1, e2, e3]) - sess.flush() - sess.expunge_all() - - # filter aliasing applied to Engineer doesn't whack Manager - eq_(sess.query(Manager) - .join(Manager.engineers) - .filter(Manager.name == 'dogbert').all(), - [m1]) - - eq_(sess.query(Manager) - .join(Manager.engineers) - .filter(Engineer.name == 'dilbert').all(), - [m2]) - - eq_(sess.query(Manager, Engineer) - .join(Manager.engineers) - .order_by(Manager.name.desc()).all(), - [(m2, e2), (m1, e1)]) - - def test_relationship_compare(self): - m1 = Manager(name='dogbert') - m2 = Manager(name='foo') - e1 = Engineer(name='dilbert', primary_language='java', reports_to=m1) - e2 = Engineer(name='wally', primary_language='c++', reports_to=m2) - e3 = Engineer(name='etc', primary_language='c++') - - sess = create_session() - sess.add(m1) - sess.add(m2) - sess.add(e1) - sess.add(e2) - sess.add(e3) - sess.flush() - sess.expunge_all() - - eq_(sess.query(Manager) - .join(Manager.engineers) - .filter(Engineer.reports_to == None).all(), - []) - - eq_(sess.query(Manager) - .join(Manager.engineers) - .filter(Engineer.reports_to == m1).all(), - [m1]) - -class SelfReferentialJ2JSelfTest(fixtures.MappedTest): - - run_setup_mappers = 'once' - - @classmethod - def define_tables(cls, metadata): - people = Table('people', metadata, - Column('person_id', Integer, - primary_key=True, - test_needs_autoincrement=True), - Column('name', String(50)), - Column('type', String(30))) - - engineers = Table('engineers', metadata, - Column('person_id', Integer, - ForeignKey('people.person_id'), - primary_key=True), - Column('reports_to_id', Integer, - ForeignKey('engineers.person_id'))) - - @classmethod - def setup_mappers(cls): - engineers = cls.tables.engineers - people = cls.tables.people - - mapper(Person, people, - polymorphic_on=people.c.type, - polymorphic_identity='person') - - mapper(Engineer, engineers, - inherits=Person, - polymorphic_identity='engineer', - properties={ - 'reports_to':relationship( - Engineer, - primaryjoin= - engineers.c.person_id == engineers.c.reports_to_id, - backref='engineers', - remote_side=engineers.c.person_id)}) - - def _two_obj_fixture(self): - e1 = Engineer(name='wally') - e2 = Engineer(name='dilbert', reports_to=e1) - sess = Session() - sess.add_all([e1, e2]) - sess.commit() - return sess - - def _five_obj_fixture(self): - sess = Session() - e1, e2, e3, e4, e5 = [ - Engineer(name='e%d' % (i + 1)) for i in xrange(5) - ] - e3.reports_to = e1 - e4.reports_to = e2 - sess.add_all([e1, e2, e3, e4, e5]) - sess.commit() - return sess - - def test_has(self): - sess = self._two_obj_fixture() - eq_(sess.query(Engineer) - .filter(Engineer.reports_to.has(Engineer.name == 'wally')) - .first(), - Engineer(name='dilbert')) - - def test_join_explicit_alias(self): - sess = self._five_obj_fixture() - ea = aliased(Engineer) - eq_(sess.query(Engineer) - .join(ea, Engineer.engineers) - .filter(Engineer.name == 'e1').all(), - [Engineer(name='e1')]) - - def test_join_aliased_flag_one(self): - sess = self._two_obj_fixture() - eq_(sess.query(Engineer) - .join('reports_to', aliased=True) - .filter(Engineer.name == 'wally').first(), - Engineer(name='dilbert')) - - def test_join_aliased_flag_two(self): - sess = self._five_obj_fixture() - eq_(sess.query(Engineer) - .join(Engineer.engineers, aliased=True) - .filter(Engineer.name == 'e4').all(), - [Engineer(name='e2')]) - - def test_relationship_compare(self): - sess = self._five_obj_fixture() - e1 = sess.query(Engineer).filter_by(name='e1').one() - - eq_(sess.query(Engineer) - .join(Engineer.engineers, aliased=True) - .filter(Engineer.reports_to == None).all(), - []) - - eq_(sess.query(Engineer) - .join(Engineer.engineers, aliased=True) - .filter(Engineer.reports_to == e1).all(), - [e1]) - -class M2MFilterTest(fixtures.MappedTest): - - run_setup_mappers = 'once' - run_inserts = 'once' - run_deletes = None - - @classmethod - def define_tables(cls, metadata): - organizations = Table('organizations', metadata, - Column('id', Integer, - primary_key=True, - test_needs_autoincrement=True), - Column('name', String(50))) - - engineers_to_org = Table('engineers_to_org', metadata, - Column('org_id', Integer, - ForeignKey('organizations.id')), - Column('engineer_id', Integer, - ForeignKey('engineers.person_id'))) - - people = Table('people', metadata, - Column('person_id', Integer, - primary_key=True, - test_needs_autoincrement=True), - Column('name', String(50)), - Column('type', String(30))) - - engineers = Table('engineers', metadata, - Column('person_id', Integer, - ForeignKey('people.person_id'), - primary_key=True), - Column('primary_language', String(50))) - - @classmethod - def setup_mappers(cls): - organizations = cls.tables.organizations - people = cls.tables.people - engineers = cls.tables.engineers - engineers_to_org = cls.tables.engineers_to_org - - class Organization(cls.Comparable): - pass - - mapper(Organization, organizations, - properties={ - 'engineers':relationship( - Engineer, - secondary=engineers_to_org, - backref='organizations')}) - - mapper(Person, people, - polymorphic_on=people.c.type, - polymorphic_identity='person') - - mapper(Engineer, engineers, - inherits=Person, - polymorphic_identity='engineer') - - @classmethod - def insert_data(cls): - Organization = cls.classes.Organization - e1 = Engineer(name='e1') - e2 = Engineer(name='e2') - e3 = Engineer(name='e3') - e4 = Engineer(name='e4') - org1 = Organization(name='org1', engineers=[e1, e2]) - org2 = Organization(name='org2', engineers=[e3, e4]) - sess = create_session() - sess.add(org1) - sess.add(org2) - sess.flush() - - def test_not_contains(self): - Organization = self.classes.Organization - sess = create_session() - e1 = sess.query(Person).filter(Engineer.name == 'e1').one() - - # this works - eq_(sess.query(Organization) - .filter(~Organization.engineers - .of_type(Engineer) - .contains(e1)) - .all(), - [Organization(name='org2')]) - - # this had a bug - eq_(sess.query(Organization) - .filter(~Organization.engineers - .contains(e1)) - .all(), - [Organization(name='org2')]) - - def test_any(self): - sess = create_session() - Organization = self.classes.Organization - - eq_(sess.query(Organization) - .filter(Organization.engineers - .of_type(Engineer) - .any(Engineer.name == 'e1')) - .all(), - [Organization(name='org1')]) - - eq_(sess.query(Organization) - .filter(Organization.engineers - .any(Engineer.name == 'e1')) - .all(), - [Organization(name='org1')]) - -class SelfReferentialM2MTest(fixtures.MappedTest, AssertsCompiledSQL): - - @classmethod - def define_tables(cls, metadata): - Table('secondary', metadata, - Column('left_id', Integer, - ForeignKey('parent.id'), - nullable=False), - Column('right_id', Integer, - ForeignKey('parent.id'), - nullable=False)) - - Table('parent', metadata, - Column('id', Integer, - primary_key=True, - test_needs_autoincrement=True), - Column('cls', String(50))) - - Table('child1', metadata, - Column('id', Integer, - ForeignKey('parent.id'), - primary_key=True)) - - Table('child2', metadata, - Column('id', Integer, - ForeignKey('parent.id'), - primary_key=True)) - - @classmethod - def setup_classes(cls): - class Parent(cls.Basic): - pass - class Child1(Parent): - pass - class Child2(Parent): - pass - - @classmethod - def setup_mappers(cls): - child1 = cls.tables.child1 - child2 = cls.tables.child2 - Parent = cls.classes.Parent - parent = cls.tables.parent - Child1 = cls.classes.Child1 - Child2 = cls.classes.Child2 - secondary = cls.tables.secondary - - mapper(Parent, parent, - polymorphic_on=parent.c.cls) - - mapper(Child1, child1, - inherits=Parent, - polymorphic_identity='child1', - properties={ - 'left_child2':relationship( - Child2, - secondary=secondary, - primaryjoin=parent.c.id == secondary.c.right_id, - secondaryjoin=parent.c.id == secondary.c.left_id, - uselist=False, - backref="right_children")}) - - mapper(Child2, child2, - inherits=Parent, - polymorphic_identity='child2') - - def test_query_crit(self): - Child1, Child2 = self.classes.Child1, self.classes.Child2 - sess = create_session() - c11, c12, c13 = Child1(), Child1(), Child1() - c21, c22, c23 = Child2(), Child2(), Child2() - c11.left_child2 = c22 - c12.left_child2 = c22 - c13.left_child2 = c23 - sess.add_all([c11, c12, c13, c21, c22, c23]) - sess.flush() - - # test that the join to Child2 doesn't alias Child1 in the select - eq_(set(sess.query(Child1).join(Child1.left_child2)), - set([c11, c12, c13])) - - eq_(set(sess.query(Child1, Child2).join(Child1.left_child2)), - set([(c11, c22), (c12, c22), (c13, c23)])) - - # test __eq__() on property is annotating correctly - eq_(set(sess.query(Child2) - .join(Child2.right_children) - .filter(Child1.left_child2 == c22)), - set([c22])) - - # test the same again - self.assert_compile( - sess.query(Child2) - .join(Child2.right_children) - .filter(Child1.left_child2 == c22) - .with_labels().statement, - "SELECT child2.id AS child2_id, parent.id AS parent_id, " - "parent.cls AS parent_cls FROM secondary AS secondary_1, " - "parent JOIN child2 ON parent.id = child2.id JOIN secondary AS " - "secondary_2 ON parent.id = secondary_2.left_id JOIN (SELECT " - "parent.id AS parent_id, parent.cls AS parent_cls, child1.id AS " - "child1_id FROM parent JOIN child1 ON parent.id = child1.id) AS " - "anon_1 ON anon_1.parent_id = secondary_2.right_id WHERE " - "anon_1.parent_id = secondary_1.right_id AND :param_1 = " - "secondary_1.left_id", - dialect=default.DefaultDialect() - ) - - def test_eager_join(self): - Child1, Child2 = self.classes.Child1, self.classes.Child2 - sess = create_session() - c1 = Child1() - c1.left_child2 = Child2() - sess.add(c1) - sess.flush() - - # test that the splicing of the join works here, doesn't break in - # the middle of "parent join child1" - q = sess.query(Child1).options(joinedload('left_child2')) - self.assert_compile(q.limit(1).with_labels().statement, - "SELECT anon_1.child1_id AS anon_1_child1_id, anon_1.parent_id " - "AS anon_1_parent_id, anon_1.parent_cls AS anon_1_parent_cls, " - "anon_2.child2_id AS anon_2_child2_id, anon_2.parent_id AS " - "anon_2_parent_id, anon_2.parent_cls AS anon_2_parent_cls FROM " - "(SELECT child1.id AS child1_id, parent.id AS parent_id, " - "parent.cls AS parent_cls FROM parent JOIN child1 ON parent.id = " - "child1.id LIMIT :param_1) AS anon_1 LEFT OUTER JOIN secondary " - "AS secondary_1 ON anon_1.parent_id = secondary_1.right_id LEFT " - "OUTER JOIN (SELECT parent.id AS parent_id, parent.cls AS " - "parent_cls, child2.id AS child2_id FROM parent JOIN child2 ON " - "parent.id = child2.id) AS anon_2 ON anon_2.parent_id = " - "secondary_1.left_id", - {'param_1':1}, - dialect=default.DefaultDialect()) - - # another way to check - assert q.limit(1).with_labels().subquery().count().scalar() == 1 - assert q.first() is c1 - - def test_subquery_load(self): - Child1, Child2 = self.classes.Child1, self.classes.Child2 - sess = create_session() - c1 = Child1() - c1.left_child2 = Child2() - sess.add(c1) - sess.flush() - sess.expunge_all() - - query_ = sess.query(Child1).options(subqueryload('left_child2')) - for row in query_.all(): - assert row.left_child2 - -class EagerToSubclassTest(fixtures.MappedTest): - """Test eager loads to subclass mappers""" - - run_setup_classes = 'once' - run_setup_mappers = 'once' - run_inserts = 'once' - run_deletes = None - - @classmethod - def define_tables(cls, metadata): - Table('parent', metadata, - Column('id', Integer, - primary_key=True, - test_needs_autoincrement=True), - Column('data', String(10))) - - Table('base', metadata, - Column('id', Integer, - primary_key=True, - test_needs_autoincrement=True), - Column('type', String(10)), - Column('related_id', Integer, - ForeignKey('related.id'))) - - Table('sub', metadata, - Column('id', Integer, - ForeignKey('base.id'), - primary_key=True), - Column('data', String(10)), - Column('parent_id', Integer, - ForeignKey('parent.id'), - nullable=False)) - - Table('related', metadata, - Column('id', Integer, - primary_key=True, - test_needs_autoincrement=True), - Column('data', String(10))) - - @classmethod - def setup_classes(cls): - class Parent(cls.Comparable): - pass - class Base(cls.Comparable): - pass - class Sub(Base): - pass - class Related(cls.Comparable): - pass - - @classmethod - def setup_mappers(cls): - sub = cls.tables.sub - Sub = cls.classes.Sub - base = cls.tables.base - Base = cls.classes.Base - parent = cls.tables.parent - Parent = cls.classes.Parent - related = cls.tables.related - Related = cls.classes.Related - - mapper(Parent, parent, - properties={'children':relationship(Sub, order_by=sub.c.data)}) - - mapper(Base, base, - polymorphic_on=base.c.type, - polymorphic_identity='b', - properties={'related':relationship(Related)}) - - mapper(Sub, sub, - inherits=Base, - polymorphic_identity='s') - - mapper(Related, related) - - @classmethod - def insert_data(cls): - global p1, p2 - - Parent = cls.classes.Parent - Sub = cls.classes.Sub - Related = cls.classes.Related - sess = Session() - r1, r2 = Related(data='r1'), Related(data='r2') - s1 = Sub(data='s1', related=r1) - s2 = Sub(data='s2', related=r2) - s3 = Sub(data='s3') - s4 = Sub(data='s4', related=r2) - s5 = Sub(data='s5') - p1 = Parent(data='p1', children=[s1, s2, s3]) - p2 = Parent(data='p2', children=[s4, s5]) - sess.add(p1) - sess.add(p2) - sess.commit() - - def test_joinedload(self): - Parent = self.classes.Parent - sess = Session() - def go(): - eq_(sess.query(Parent) - .options(joinedload(Parent.children)).all(), - [p1, p2]) - self.assert_sql_count(testing.db, go, 1) - - def test_contains_eager(self): - Parent = self.classes.Parent - Sub = self.classes.Sub - sess = Session() - def go(): - eq_(sess.query(Parent) - .join(Parent.children) - .options(contains_eager(Parent.children)) - .order_by(Parent.data, Sub.data).all(), - [p1, p2]) - self.assert_sql_count(testing.db, go, 1) - - def test_subq_through_related(self): - Parent = self.classes.Parent - Sub = self.classes.Sub - sess = Session() - def go(): - eq_(sess.query(Parent) - .options(subqueryload_all(Parent.children, Sub.related)) - .order_by(Parent.data).all(), - [p1, p2]) - self.assert_sql_count(testing.db, go, 3) - -class SubClassEagerToSubClassTest(fixtures.MappedTest): - """Test joinedloads from subclass to subclass mappers""" - - run_setup_classes = 'once' - run_setup_mappers = 'once' - run_inserts = 'once' - run_deletes = None - - @classmethod - def define_tables(cls, metadata): - Table('parent', metadata, - Column('id', Integer, - primary_key=True, - test_needs_autoincrement=True), - Column('type', String(10)), - ) - - Table('subparent', metadata, - Column('id', Integer, - ForeignKey('parent.id'), - primary_key=True), - Column('data', String(10)), - ) - - Table('base', metadata, - Column('id', Integer, - primary_key=True, - test_needs_autoincrement=True), - Column('type', String(10)), - ) - - Table('sub', metadata, - Column('id', Integer, - ForeignKey('base.id'), - primary_key=True), - Column('data', String(10)), - Column('subparent_id', Integer, - ForeignKey('subparent.id'), - nullable=False) - ) - - @classmethod - def setup_classes(cls): - class Parent(cls.Comparable): - pass - class Subparent(Parent): - pass - class Base(cls.Comparable): - pass - class Sub(Base): - pass - - @classmethod - def setup_mappers(cls): - sub = cls.tables.sub - Sub = cls.classes.Sub - base = cls.tables.base - Base = cls.classes.Base - parent = cls.tables.parent - Parent = cls.classes.Parent - subparent = cls.tables.subparent - Subparent = cls.classes.Subparent - - mapper(Parent, parent, - polymorphic_on=parent.c.type, - polymorphic_identity='b') - - mapper(Subparent, subparent, - inherits=Parent, - polymorphic_identity='s', - properties={ - 'children':relationship(Sub, order_by=base.c.id)}) - - mapper(Base, base, - polymorphic_on=base.c.type, - polymorphic_identity='b') - - mapper(Sub, sub, - inherits=Base, - polymorphic_identity='s') - - @classmethod - def insert_data(cls): - global p1, p2 - - Sub, Subparent = cls.classes.Sub, cls.classes.Subparent - sess = create_session() - p1 = Subparent( - data='p1', - children=[Sub(data='s1'), Sub(data='s2'), Sub(data='s3')]) - p2 = Subparent( - data='p2', - children=[Sub(data='s4'), Sub(data='s5')]) - sess.add(p1) - sess.add(p2) - sess.flush() - - def test_joinedload(self): - Subparent = self.classes.Subparent - - sess = create_session() - def go(): - eq_(sess.query(Subparent) - .options(joinedload(Subparent.children)).all(), - [p1, p2]) - self.assert_sql_count(testing.db, go, 1) - - sess.expunge_all() - def go(): - eq_(sess.query(Subparent) - .options(joinedload("children")).all(), - [p1, p2]) - self.assert_sql_count(testing.db, go, 1) - - def test_contains_eager(self): - Subparent = self.classes.Subparent - - sess = create_session() - def go(): - eq_(sess.query(Subparent) - .join(Subparent.children) - .options(contains_eager(Subparent.children)).all(), - [p1, p2]) - self.assert_sql_count(testing.db, go, 1) - - sess.expunge_all() - def go(): - eq_(sess.query(Subparent) - .join(Subparent.children) - .options(contains_eager("children")).all(), - [p1, p2]) - self.assert_sql_count(testing.db, go, 1) - - def test_subqueryload(self): - Subparent = self.classes.Subparent - - sess = create_session() - def go(): - eq_(sess.query(Subparent) - .options(subqueryload(Subparent.children)).all(), - [p1, p2]) - self.assert_sql_count(testing.db, go, 2) - - sess.expunge_all() - def go(): - eq_(sess.query(Subparent) - .options(subqueryload("children")).all(), - [p1, p2]) - self.assert_sql_count(testing.db, go, 2) - diff --git a/test/orm/inheritance/test_relationship.py b/test/orm/inheritance/test_relationship.py new file mode 100644 index 0000000000..8db5f6b3b2 --- /dev/null +++ b/test/orm/inheritance/test_relationship.py @@ -0,0 +1,877 @@ +from sqlalchemy.orm import create_session, relationship, mapper, \ + contains_eager, joinedload, subqueryload, subqueryload_all,\ + Session, aliased + +from sqlalchemy import Integer, String, ForeignKey +from sqlalchemy.engine import default + +from test.lib import AssertsCompiledSQL, fixtures, testing +from test.lib.schema import Table, Column +from test.lib.testing import assert_raises, eq_ + +class Company(fixtures.ComparableEntity): + pass +class Person(fixtures.ComparableEntity): + pass +class Engineer(Person): + pass +class Manager(Person): + pass +class Boss(Manager): + pass +class Machine(fixtures.ComparableEntity): + pass +class Paperwork(fixtures.ComparableEntity): + pass + +class SelfReferentialTestJoinedToBase(fixtures.MappedTest): + + run_setup_mappers = 'once' + + @classmethod + def define_tables(cls, metadata): + Table('people', metadata, + Column('person_id', Integer, + primary_key=True, + test_needs_autoincrement=True), + Column('name', String(50)), + Column('type', String(30))) + + Table('engineers', metadata, + Column('person_id', Integer, + ForeignKey('people.person_id'), + primary_key=True), + Column('primary_language', String(50)), + Column('reports_to_id', Integer, + ForeignKey('people.person_id'))) + + @classmethod + def setup_mappers(cls): + engineers, people = cls.tables.engineers, cls.tables.people + + mapper(Person, people, + polymorphic_on=people.c.type, + polymorphic_identity='person') + + mapper(Engineer, engineers, + inherits=Person, + inherit_condition=engineers.c.person_id == people.c.person_id, + polymorphic_identity='engineer', + properties={ + 'reports_to':relationship( + Person, + primaryjoin= + people.c.person_id == engineers.c.reports_to_id)}) + + def test_has(self): + p1 = Person(name='dogbert') + e1 = Engineer(name='dilbert', primary_language='java', reports_to=p1) + sess = create_session() + sess.add(p1) + sess.add(e1) + sess.flush() + sess.expunge_all() + eq_(sess.query(Engineer) + .filter(Engineer.reports_to.has(Person.name == 'dogbert')) + .first(), + Engineer(name='dilbert')) + + def test_oftype_aliases_in_exists(self): + e1 = Engineer(name='dilbert', primary_language='java') + e2 = Engineer(name='wally', primary_language='c++', reports_to=e1) + sess = create_session() + sess.add_all([e1, e2]) + sess.flush() + eq_(sess.query(Engineer) + .filter(Engineer.reports_to + .of_type(Engineer) + .has(Engineer.name == 'dilbert')) + .first(), + e2) + + def test_join(self): + p1 = Person(name='dogbert') + e1 = Engineer(name='dilbert', primary_language='java', reports_to=p1) + sess = create_session() + sess.add(p1) + sess.add(e1) + sess.flush() + sess.expunge_all() + eq_(sess.query(Engineer) + .join('reports_to', aliased=True) + .filter(Person.name == 'dogbert').first(), + Engineer(name='dilbert')) + +class SelfReferentialJ2JTest(fixtures.MappedTest): + + run_setup_mappers = 'once' + + @classmethod + def define_tables(cls, metadata): + people = Table('people', metadata, + Column('person_id', Integer, + primary_key=True, + test_needs_autoincrement=True), + Column('name', String(50)), + Column('type', String(30))) + + engineers = Table('engineers', metadata, + Column('person_id', Integer, + ForeignKey('people.person_id'), + primary_key=True), + Column('primary_language', String(50)), + Column('reports_to_id', Integer, + ForeignKey('managers.person_id')) + ) + + managers = Table('managers', metadata, + Column('person_id', Integer, ForeignKey('people.person_id'), + primary_key=True), + ) + + @classmethod + def setup_mappers(cls): + engineers = cls.tables.engineers + managers = cls.tables.managers + people = cls.tables.people + + mapper(Person, people, + polymorphic_on=people.c.type, + polymorphic_identity='person') + + mapper(Manager, managers, + inherits=Person, + polymorphic_identity='manager') + + mapper(Engineer, engineers, + inherits=Person, + polymorphic_identity='engineer', + properties={ + 'reports_to':relationship( + Manager, + primaryjoin= + managers.c.person_id == engineers.c.reports_to_id, + backref='engineers')}) + + def test_has(self): + m1 = Manager(name='dogbert') + e1 = Engineer(name='dilbert', primary_language='java', reports_to=m1) + sess = create_session() + sess.add(m1) + sess.add(e1) + sess.flush() + sess.expunge_all() + + eq_(sess.query(Engineer) + .filter(Engineer.reports_to.has(Manager.name == 'dogbert')) + .first(), + Engineer(name='dilbert')) + + def test_join(self): + m1 = Manager(name='dogbert') + e1 = Engineer(name='dilbert', primary_language='java', reports_to=m1) + sess = create_session() + sess.add(m1) + sess.add(e1) + sess.flush() + sess.expunge_all() + + eq_(sess.query(Engineer) + .join('reports_to', aliased=True) + .filter(Manager.name == 'dogbert').first(), + Engineer(name='dilbert')) + + def test_filter_aliasing(self): + m1 = Manager(name='dogbert') + m2 = Manager(name='foo') + e1 = Engineer(name='wally', primary_language='java', reports_to=m1) + e2 = Engineer(name='dilbert', primary_language='c++', reports_to=m2) + e3 = Engineer(name='etc', primary_language='c++') + + sess = create_session() + sess.add_all([m1, m2, e1, e2, e3]) + sess.flush() + sess.expunge_all() + + # filter aliasing applied to Engineer doesn't whack Manager + eq_(sess.query(Manager) + .join(Manager.engineers) + .filter(Manager.name == 'dogbert').all(), + [m1]) + + eq_(sess.query(Manager) + .join(Manager.engineers) + .filter(Engineer.name == 'dilbert').all(), + [m2]) + + eq_(sess.query(Manager, Engineer) + .join(Manager.engineers) + .order_by(Manager.name.desc()).all(), + [(m2, e2), (m1, e1)]) + + def test_relationship_compare(self): + m1 = Manager(name='dogbert') + m2 = Manager(name='foo') + e1 = Engineer(name='dilbert', primary_language='java', reports_to=m1) + e2 = Engineer(name='wally', primary_language='c++', reports_to=m2) + e3 = Engineer(name='etc', primary_language='c++') + + sess = create_session() + sess.add(m1) + sess.add(m2) + sess.add(e1) + sess.add(e2) + sess.add(e3) + sess.flush() + sess.expunge_all() + + eq_(sess.query(Manager) + .join(Manager.engineers) + .filter(Engineer.reports_to == None).all(), + []) + + eq_(sess.query(Manager) + .join(Manager.engineers) + .filter(Engineer.reports_to == m1).all(), + [m1]) + +class SelfReferentialJ2JSelfTest(fixtures.MappedTest): + + run_setup_mappers = 'once' + + @classmethod + def define_tables(cls, metadata): + people = Table('people', metadata, + Column('person_id', Integer, + primary_key=True, + test_needs_autoincrement=True), + Column('name', String(50)), + Column('type', String(30))) + + engineers = Table('engineers', metadata, + Column('person_id', Integer, + ForeignKey('people.person_id'), + primary_key=True), + Column('reports_to_id', Integer, + ForeignKey('engineers.person_id'))) + + @classmethod + def setup_mappers(cls): + engineers = cls.tables.engineers + people = cls.tables.people + + mapper(Person, people, + polymorphic_on=people.c.type, + polymorphic_identity='person') + + mapper(Engineer, engineers, + inherits=Person, + polymorphic_identity='engineer', + properties={ + 'reports_to':relationship( + Engineer, + primaryjoin= + engineers.c.person_id == engineers.c.reports_to_id, + backref='engineers', + remote_side=engineers.c.person_id)}) + + def _two_obj_fixture(self): + e1 = Engineer(name='wally') + e2 = Engineer(name='dilbert', reports_to=e1) + sess = Session() + sess.add_all([e1, e2]) + sess.commit() + return sess + + def _five_obj_fixture(self): + sess = Session() + e1, e2, e3, e4, e5 = [ + Engineer(name='e%d' % (i + 1)) for i in xrange(5) + ] + e3.reports_to = e1 + e4.reports_to = e2 + sess.add_all([e1, e2, e3, e4, e5]) + sess.commit() + return sess + + def test_has(self): + sess = self._two_obj_fixture() + eq_(sess.query(Engineer) + .filter(Engineer.reports_to.has(Engineer.name == 'wally')) + .first(), + Engineer(name='dilbert')) + + def test_join_explicit_alias(self): + sess = self._five_obj_fixture() + ea = aliased(Engineer) + eq_(sess.query(Engineer) + .join(ea, Engineer.engineers) + .filter(Engineer.name == 'e1').all(), + [Engineer(name='e1')]) + + def test_join_aliased_flag_one(self): + sess = self._two_obj_fixture() + eq_(sess.query(Engineer) + .join('reports_to', aliased=True) + .filter(Engineer.name == 'wally').first(), + Engineer(name='dilbert')) + + def test_join_aliased_flag_two(self): + sess = self._five_obj_fixture() + eq_(sess.query(Engineer) + .join(Engineer.engineers, aliased=True) + .filter(Engineer.name == 'e4').all(), + [Engineer(name='e2')]) + + def test_relationship_compare(self): + sess = self._five_obj_fixture() + e1 = sess.query(Engineer).filter_by(name='e1').one() + + eq_(sess.query(Engineer) + .join(Engineer.engineers, aliased=True) + .filter(Engineer.reports_to == None).all(), + []) + + eq_(sess.query(Engineer) + .join(Engineer.engineers, aliased=True) + .filter(Engineer.reports_to == e1).all(), + [e1]) + +class M2MFilterTest(fixtures.MappedTest): + + run_setup_mappers = 'once' + run_inserts = 'once' + run_deletes = None + + @classmethod + def define_tables(cls, metadata): + organizations = Table('organizations', metadata, + Column('id', Integer, + primary_key=True, + test_needs_autoincrement=True), + Column('name', String(50))) + + engineers_to_org = Table('engineers_to_org', metadata, + Column('org_id', Integer, + ForeignKey('organizations.id')), + Column('engineer_id', Integer, + ForeignKey('engineers.person_id'))) + + people = Table('people', metadata, + Column('person_id', Integer, + primary_key=True, + test_needs_autoincrement=True), + Column('name', String(50)), + Column('type', String(30))) + + engineers = Table('engineers', metadata, + Column('person_id', Integer, + ForeignKey('people.person_id'), + primary_key=True), + Column('primary_language', String(50))) + + @classmethod + def setup_mappers(cls): + organizations = cls.tables.organizations + people = cls.tables.people + engineers = cls.tables.engineers + engineers_to_org = cls.tables.engineers_to_org + + class Organization(cls.Comparable): + pass + + mapper(Organization, organizations, + properties={ + 'engineers':relationship( + Engineer, + secondary=engineers_to_org, + backref='organizations')}) + + mapper(Person, people, + polymorphic_on=people.c.type, + polymorphic_identity='person') + + mapper(Engineer, engineers, + inherits=Person, + polymorphic_identity='engineer') + + @classmethod + def insert_data(cls): + Organization = cls.classes.Organization + e1 = Engineer(name='e1') + e2 = Engineer(name='e2') + e3 = Engineer(name='e3') + e4 = Engineer(name='e4') + org1 = Organization(name='org1', engineers=[e1, e2]) + org2 = Organization(name='org2', engineers=[e3, e4]) + sess = create_session() + sess.add(org1) + sess.add(org2) + sess.flush() + + def test_not_contains(self): + Organization = self.classes.Organization + sess = create_session() + e1 = sess.query(Person).filter(Engineer.name == 'e1').one() + + # this works + eq_(sess.query(Organization) + .filter(~Organization.engineers + .of_type(Engineer) + .contains(e1)) + .all(), + [Organization(name='org2')]) + + # this had a bug + eq_(sess.query(Organization) + .filter(~Organization.engineers + .contains(e1)) + .all(), + [Organization(name='org2')]) + + def test_any(self): + sess = create_session() + Organization = self.classes.Organization + + eq_(sess.query(Organization) + .filter(Organization.engineers + .of_type(Engineer) + .any(Engineer.name == 'e1')) + .all(), + [Organization(name='org1')]) + + eq_(sess.query(Organization) + .filter(Organization.engineers + .any(Engineer.name == 'e1')) + .all(), + [Organization(name='org1')]) + +class SelfReferentialM2MTest(fixtures.MappedTest, AssertsCompiledSQL): + + @classmethod + def define_tables(cls, metadata): + Table('secondary', metadata, + Column('left_id', Integer, + ForeignKey('parent.id'), + nullable=False), + Column('right_id', Integer, + ForeignKey('parent.id'), + nullable=False)) + + Table('parent', metadata, + Column('id', Integer, + primary_key=True, + test_needs_autoincrement=True), + Column('cls', String(50))) + + Table('child1', metadata, + Column('id', Integer, + ForeignKey('parent.id'), + primary_key=True)) + + Table('child2', metadata, + Column('id', Integer, + ForeignKey('parent.id'), + primary_key=True)) + + @classmethod + def setup_classes(cls): + class Parent(cls.Basic): + pass + class Child1(Parent): + pass + class Child2(Parent): + pass + + @classmethod + def setup_mappers(cls): + child1 = cls.tables.child1 + child2 = cls.tables.child2 + Parent = cls.classes.Parent + parent = cls.tables.parent + Child1 = cls.classes.Child1 + Child2 = cls.classes.Child2 + secondary = cls.tables.secondary + + mapper(Parent, parent, + polymorphic_on=parent.c.cls) + + mapper(Child1, child1, + inherits=Parent, + polymorphic_identity='child1', + properties={ + 'left_child2':relationship( + Child2, + secondary=secondary, + primaryjoin=parent.c.id == secondary.c.right_id, + secondaryjoin=parent.c.id == secondary.c.left_id, + uselist=False, + backref="right_children")}) + + mapper(Child2, child2, + inherits=Parent, + polymorphic_identity='child2') + + def test_query_crit(self): + Child1, Child2 = self.classes.Child1, self.classes.Child2 + sess = create_session() + c11, c12, c13 = Child1(), Child1(), Child1() + c21, c22, c23 = Child2(), Child2(), Child2() + c11.left_child2 = c22 + c12.left_child2 = c22 + c13.left_child2 = c23 + sess.add_all([c11, c12, c13, c21, c22, c23]) + sess.flush() + + # test that the join to Child2 doesn't alias Child1 in the select + eq_(set(sess.query(Child1).join(Child1.left_child2)), + set([c11, c12, c13])) + + eq_(set(sess.query(Child1, Child2).join(Child1.left_child2)), + set([(c11, c22), (c12, c22), (c13, c23)])) + + # test __eq__() on property is annotating correctly + eq_(set(sess.query(Child2) + .join(Child2.right_children) + .filter(Child1.left_child2 == c22)), + set([c22])) + + # test the same again + self.assert_compile( + sess.query(Child2) + .join(Child2.right_children) + .filter(Child1.left_child2 == c22) + .with_labels().statement, + "SELECT child2.id AS child2_id, parent.id AS parent_id, " + "parent.cls AS parent_cls FROM secondary AS secondary_1, " + "parent JOIN child2 ON parent.id = child2.id JOIN secondary AS " + "secondary_2 ON parent.id = secondary_2.left_id JOIN (SELECT " + "parent.id AS parent_id, parent.cls AS parent_cls, child1.id AS " + "child1_id FROM parent JOIN child1 ON parent.id = child1.id) AS " + "anon_1 ON anon_1.parent_id = secondary_2.right_id WHERE " + "anon_1.parent_id = secondary_1.right_id AND :param_1 = " + "secondary_1.left_id", + dialect=default.DefaultDialect() + ) + + def test_eager_join(self): + Child1, Child2 = self.classes.Child1, self.classes.Child2 + sess = create_session() + c1 = Child1() + c1.left_child2 = Child2() + sess.add(c1) + sess.flush() + + # test that the splicing of the join works here, doesn't break in + # the middle of "parent join child1" + q = sess.query(Child1).options(joinedload('left_child2')) + self.assert_compile(q.limit(1).with_labels().statement, + "SELECT anon_1.child1_id AS anon_1_child1_id, anon_1.parent_id " + "AS anon_1_parent_id, anon_1.parent_cls AS anon_1_parent_cls, " + "anon_2.child2_id AS anon_2_child2_id, anon_2.parent_id AS " + "anon_2_parent_id, anon_2.parent_cls AS anon_2_parent_cls FROM " + "(SELECT child1.id AS child1_id, parent.id AS parent_id, " + "parent.cls AS parent_cls FROM parent JOIN child1 ON parent.id = " + "child1.id LIMIT :param_1) AS anon_1 LEFT OUTER JOIN secondary " + "AS secondary_1 ON anon_1.parent_id = secondary_1.right_id LEFT " + "OUTER JOIN (SELECT parent.id AS parent_id, parent.cls AS " + "parent_cls, child2.id AS child2_id FROM parent JOIN child2 ON " + "parent.id = child2.id) AS anon_2 ON anon_2.parent_id = " + "secondary_1.left_id", + {'param_1':1}, + dialect=default.DefaultDialect()) + + # another way to check + assert q.limit(1).with_labels().subquery().count().scalar() == 1 + assert q.first() is c1 + + def test_subquery_load(self): + Child1, Child2 = self.classes.Child1, self.classes.Child2 + sess = create_session() + c1 = Child1() + c1.left_child2 = Child2() + sess.add(c1) + sess.flush() + sess.expunge_all() + + query_ = sess.query(Child1).options(subqueryload('left_child2')) + for row in query_.all(): + assert row.left_child2 + +class EagerToSubclassTest(fixtures.MappedTest): + """Test eager loads to subclass mappers""" + + run_setup_classes = 'once' + run_setup_mappers = 'once' + run_inserts = 'once' + run_deletes = None + + @classmethod + def define_tables(cls, metadata): + Table('parent', metadata, + Column('id', Integer, + primary_key=True, + test_needs_autoincrement=True), + Column('data', String(10))) + + Table('base', metadata, + Column('id', Integer, + primary_key=True, + test_needs_autoincrement=True), + Column('type', String(10)), + Column('related_id', Integer, + ForeignKey('related.id'))) + + Table('sub', metadata, + Column('id', Integer, + ForeignKey('base.id'), + primary_key=True), + Column('data', String(10)), + Column('parent_id', Integer, + ForeignKey('parent.id'), + nullable=False)) + + Table('related', metadata, + Column('id', Integer, + primary_key=True, + test_needs_autoincrement=True), + Column('data', String(10))) + + @classmethod + def setup_classes(cls): + class Parent(cls.Comparable): + pass + class Base(cls.Comparable): + pass + class Sub(Base): + pass + class Related(cls.Comparable): + pass + + @classmethod + def setup_mappers(cls): + sub = cls.tables.sub + Sub = cls.classes.Sub + base = cls.tables.base + Base = cls.classes.Base + parent = cls.tables.parent + Parent = cls.classes.Parent + related = cls.tables.related + Related = cls.classes.Related + + mapper(Parent, parent, + properties={'children':relationship(Sub, order_by=sub.c.data)}) + + mapper(Base, base, + polymorphic_on=base.c.type, + polymorphic_identity='b', + properties={'related':relationship(Related)}) + + mapper(Sub, sub, + inherits=Base, + polymorphic_identity='s') + + mapper(Related, related) + + @classmethod + def insert_data(cls): + global p1, p2 + + Parent = cls.classes.Parent + Sub = cls.classes.Sub + Related = cls.classes.Related + sess = Session() + r1, r2 = Related(data='r1'), Related(data='r2') + s1 = Sub(data='s1', related=r1) + s2 = Sub(data='s2', related=r2) + s3 = Sub(data='s3') + s4 = Sub(data='s4', related=r2) + s5 = Sub(data='s5') + p1 = Parent(data='p1', children=[s1, s2, s3]) + p2 = Parent(data='p2', children=[s4, s5]) + sess.add(p1) + sess.add(p2) + sess.commit() + + def test_joinedload(self): + Parent = self.classes.Parent + sess = Session() + def go(): + eq_(sess.query(Parent) + .options(joinedload(Parent.children)).all(), + [p1, p2]) + self.assert_sql_count(testing.db, go, 1) + + def test_contains_eager(self): + Parent = self.classes.Parent + Sub = self.classes.Sub + sess = Session() + def go(): + eq_(sess.query(Parent) + .join(Parent.children) + .options(contains_eager(Parent.children)) + .order_by(Parent.data, Sub.data).all(), + [p1, p2]) + self.assert_sql_count(testing.db, go, 1) + + def test_subq_through_related(self): + Parent = self.classes.Parent + Sub = self.classes.Sub + sess = Session() + def go(): + eq_(sess.query(Parent) + .options(subqueryload_all(Parent.children, Sub.related)) + .order_by(Parent.data).all(), + [p1, p2]) + self.assert_sql_count(testing.db, go, 3) + +class SubClassEagerToSubClassTest(fixtures.MappedTest): + """Test joinedloads from subclass to subclass mappers""" + + run_setup_classes = 'once' + run_setup_mappers = 'once' + run_inserts = 'once' + run_deletes = None + + @classmethod + def define_tables(cls, metadata): + Table('parent', metadata, + Column('id', Integer, + primary_key=True, + test_needs_autoincrement=True), + Column('type', String(10)), + ) + + Table('subparent', metadata, + Column('id', Integer, + ForeignKey('parent.id'), + primary_key=True), + Column('data', String(10)), + ) + + Table('base', metadata, + Column('id', Integer, + primary_key=True, + test_needs_autoincrement=True), + Column('type', String(10)), + ) + + Table('sub', metadata, + Column('id', Integer, + ForeignKey('base.id'), + primary_key=True), + Column('data', String(10)), + Column('subparent_id', Integer, + ForeignKey('subparent.id'), + nullable=False) + ) + + @classmethod + def setup_classes(cls): + class Parent(cls.Comparable): + pass + class Subparent(Parent): + pass + class Base(cls.Comparable): + pass + class Sub(Base): + pass + + @classmethod + def setup_mappers(cls): + sub = cls.tables.sub + Sub = cls.classes.Sub + base = cls.tables.base + Base = cls.classes.Base + parent = cls.tables.parent + Parent = cls.classes.Parent + subparent = cls.tables.subparent + Subparent = cls.classes.Subparent + + mapper(Parent, parent, + polymorphic_on=parent.c.type, + polymorphic_identity='b') + + mapper(Subparent, subparent, + inherits=Parent, + polymorphic_identity='s', + properties={ + 'children':relationship(Sub, order_by=base.c.id)}) + + mapper(Base, base, + polymorphic_on=base.c.type, + polymorphic_identity='b') + + mapper(Sub, sub, + inherits=Base, + polymorphic_identity='s') + + @classmethod + def insert_data(cls): + global p1, p2 + + Sub, Subparent = cls.classes.Sub, cls.classes.Subparent + sess = create_session() + p1 = Subparent( + data='p1', + children=[Sub(data='s1'), Sub(data='s2'), Sub(data='s3')]) + p2 = Subparent( + data='p2', + children=[Sub(data='s4'), Sub(data='s5')]) + sess.add(p1) + sess.add(p2) + sess.flush() + + def test_joinedload(self): + Subparent = self.classes.Subparent + + sess = create_session() + def go(): + eq_(sess.query(Subparent) + .options(joinedload(Subparent.children)).all(), + [p1, p2]) + self.assert_sql_count(testing.db, go, 1) + + sess.expunge_all() + def go(): + eq_(sess.query(Subparent) + .options(joinedload("children")).all(), + [p1, p2]) + self.assert_sql_count(testing.db, go, 1) + + def test_contains_eager(self): + Subparent = self.classes.Subparent + + sess = create_session() + def go(): + eq_(sess.query(Subparent) + .join(Subparent.children) + .options(contains_eager(Subparent.children)).all(), + [p1, p2]) + self.assert_sql_count(testing.db, go, 1) + + sess.expunge_all() + def go(): + eq_(sess.query(Subparent) + .join(Subparent.children) + .options(contains_eager("children")).all(), + [p1, p2]) + self.assert_sql_count(testing.db, go, 1) + + def test_subqueryload(self): + Subparent = self.classes.Subparent + + sess = create_session() + def go(): + eq_(sess.query(Subparent) + .options(subqueryload(Subparent.children)).all(), + [p1, p2]) + self.assert_sql_count(testing.db, go, 2) + + sess.expunge_all() + def go(): + eq_(sess.query(Subparent) + .options(subqueryload("children")).all(), + [p1, p2]) + self.assert_sql_count(testing.db, go, 2) + -- 2.47.2