with_polymorphic = mapper._with_polymorphic_mappers
if mapper.mapped_table not in \
self._polymorphic_adapters:
- self.__mapper_loads_polymorphically_with(mapper,
+ self._mapper_loads_polymorphically_with(mapper,
sql_util.ColumnAdapter(
selectable,
mapper._equivalent_columns))
is_aliased_class, with_polymorphic)
ent.setup_entity(entity, *d[entity])
- def __mapper_loads_polymorphically_with(self, mapper, adapter):
+ def _mapper_loads_polymorphically_with(self, mapper, adapter):
for m2 in mapper._with_polymorphic_mappers:
self._polymorphic_adapters[m2] = adapter
for m in m2.iterate_to_root():
self._from_obj_alias = sql_util.ColumnAdapter(
self._from_obj[0], equivs)
- def _get_polymorphic_adapter(self, entity, selectable):
- self.__mapper_loads_polymorphically_with(entity.mapper,
- sql_util.ColumnAdapter(selectable,
- entity.mapper._equivalent_columns))
def _reset_polymorphic_adapter(self, mapper):
for m2 in mapper._with_polymorphic_mappers:
)
return self._entity_zero()
- def _generate_mapper_zero(self):
- if not getattr(self._entities[0], 'primary_entity', False):
- raise sa_exc.InvalidRequestError(
- "No primary mapper set up for this Query.")
- entity = self._entities[0]._clone()
- self._entities = [entity] + self._entities[1:]
- return entity
def __all_equivs(self):
equivs = {}
such as concrete table mappers.
"""
- entity = self._generate_mapper_zero()
+
+ if not getattr(self._entities[0], 'primary_entity', False):
+ raise sa_exc.InvalidRequestError(
+ "No primary mapper set up for this Query.")
+ entity = self._entities[0]._clone()
+ self._entities = [entity] + self._entities[1:]
entity.set_with_polymorphic(self,
cls_or_mappers,
selectable=selectable,
consistent format with which to form the actual JOIN constructs.
"""
- self._polymorphic_adapters = self._polymorphic_adapters.copy()
if not from_joinpoint:
self._reset_joinpoint()
onclause, outerjoin, create_aliases, prop):
"""append a JOIN to the query's from clause."""
+ self._polymorphic_adapters = self._polymorphic_adapters.copy()
+
if left is None:
if self._from_obj:
left = self._from_obj[0]
# until reset_joinpoint() is called.
if need_adapter:
self._filter_aliases = ORMAdapter(right,
- equivalents=right_mapper and right_mapper._equivalent_columns or {},
+ equivalents=right_mapper and
+ right_mapper._equivalent_columns or {},
chain_to=self._filter_aliases)
# if the onclause is a ClauseElement, adapt it with any
# ensure that columns retrieved from this target in the result
# set are also adapted.
if aliased_entity and not create_aliases:
- self.__mapper_loads_polymorphically_with(
+ self._mapper_loads_polymorphically_with(
right_mapper,
ORMAdapter(
right,
# with_polymorphic() can be applied to aliases
if not self.is_aliased_class:
self.selectable = from_obj
- self.adapter = query._get_polymorphic_adapter(self, from_obj)
+ query._mapper_loads_polymorphically_with(self.mapper,
+ sql_util.ColumnAdapter(from_obj,
+ self.mapper._equivalent_columns))
filter_fn = id
-"""this is a test suite consisting mainly of end-user test cases, testing all kinds of painful
-inheritance setups for which we maintain compatibility.
+"""Very old inheritance-related tests.
+
+
"""
from test.lib.testing import eq_
global people, managers
people = Table('people', metadata,
- Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
- Column('manager_id', Integer, ForeignKey('managers.person_id', use_alter=True, name="mpid_fq")),
+ Column('person_id', Integer, Sequence('person_id_seq',
+ optional=True),
+ primary_key=True),
+ Column('manager_id', Integer,
+ ForeignKey('managers.person_id',
+ use_alter=True, name="mpid_fq")),
Column('name', String(50)),
Column('type', String(30)))
managers = Table('managers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
Column('status', String(30)),
Column('manager_name', String(50))
)
class Manager(Person):
pass
- # note that up until recently (0.4.4), we had to specify "foreign_keys" here
- # for this primary join.
mapper(Person, people, properties={
- 'manager':relationship(Manager, primaryjoin=(people.c.manager_id ==
- managers.c.person_id),
+ 'manager':relationship(Manager, primaryjoin=(
+ people.c.manager_id ==
+ managers.c.person_id),
uselist=False, post_update=True)
})
mapper(Manager, managers, inherits=Person,
inherit_condition=people.c.person_id==managers.c.person_id)
- eq_(class_mapper(Person).get_property('manager').synchronize_pairs, [(managers.c.person_id,people.c.manager_id)])
+ eq_(class_mapper(Person).get_property('manager').synchronize_pairs,
+ [(managers.c.person_id,people.c.manager_id)])
session = create_session()
p = Person(name='some person')
p = session.query(Person).get(p.person_id)
m = session.query(Manager).get(m.person_id)
- print p, m, p.manager
assert p.manager is m
def test_descendant_refs_parent(self):
pass
mapper(Person, people)
- mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, properties={
- 'employee':relationship(Person, primaryjoin=(people.c.manager_id ==
+ mapper(Manager, managers, inherits=Person,
+ inherit_condition=people.c.person_id==
+ managers.c.person_id,
+ properties={
+ 'employee':relationship(Person, primaryjoin=(
+ people.c.manager_id ==
managers.c.person_id),
foreign_keys=[people.c.manager_id],
uselist=False, post_update=True)
p = session.query(Person).get(p.person_id)
m = session.query(Manager).get(m.person_id)
- print p, m, m.employee
assert m.employee is p
class RelationshipTest2(fixtures.MappedTest):
def define_tables(cls, metadata):
global people, managers, data
people = Table('people', metadata,
- Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('person_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('name', String(50)),
Column('type', String(30)))
managers = Table('managers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
Column('manager_id', Integer, ForeignKey('people.person_id')),
Column('status', String(30)),
)
data = Table('data', metadata,
- Column('person_id', Integer, ForeignKey('managers.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('managers.person_id'),
+ primary_key=True),
Column('data', String(30))
)
if jointype == "join1":
poly_union = polymorphic_union({
'person':people.select(people.c.type=='person'),
- 'manager':join(people, managers, people.c.person_id==managers.c.person_id)
+ 'manager':join(people, managers,
+ people.c.person_id==managers.c.person_id)
}, None)
polymorphic_on=poly_union.c.type
elif jointype == "join2":
poly_union = polymorphic_union({
'person':people.select(people.c.type=='person'),
- 'manager':managers.join(people, people.c.person_id==managers.c.person_id)
+ 'manager':managers.join(people,
+ people.c.person_id==managers.c.person_id)
}, None)
polymorphic_on=poly_union.c.type
elif jointype == "join3":
self.data = data
mapper(Data, data)
- mapper(Person, people, with_polymorphic=('*', poly_union), polymorphic_identity='person', polymorphic_on=polymorphic_on)
+ mapper(Person, people,
+ with_polymorphic=('*', poly_union),
+ polymorphic_identity='person',
+ polymorphic_on=polymorphic_on)
if usedata:
- mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager',
+ mapper(Manager, managers,
+ inherits=Person,
+ inherit_condition=people.c.person_id==
+ managers.c.person_id,
+ polymorphic_identity='manager',
properties={
- 'colleague':relationship(Person, primaryjoin=managers.c.manager_id==people.c.person_id, lazy='select', uselist=False),
+ 'colleague':relationship(
+ Person,
+ primaryjoin=managers.c.manager_id==
+ people.c.person_id,
+ lazy='select', uselist=False),
'data':relationship(Data, uselist=False)
}
)
else:
- mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager',
+ mapper(Manager, managers, inherits=Person,
+ inherit_condition=people.c.person_id==
+ managers.c.person_id,
+ polymorphic_identity='manager',
properties={
- 'colleague':relationship(Person, primaryjoin=managers.c.manager_id==people.c.person_id, lazy='select', uselist=False)
+ 'colleague':relationship(Person,
+ primaryjoin=managers.c.manager_id==
+ people.c.person_id,
+ lazy='select', uselist=False)
}
)
sess.expunge_all()
p = sess.query(Person).get(p.person_id)
m = sess.query(Manager).get(m.person_id)
- print p
- print m
assert m.colleague is p
if usedata:
assert m.data.data == 'ms data'
def define_tables(cls, metadata):
global people, managers, data
people = Table('people', metadata,
- Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('person_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('colleague_id', Integer, ForeignKey('people.person_id')),
Column('name', String(50)),
Column('type', String(30)))
managers = Table('managers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
Column('status', String(30)),
)
data = Table('data', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
Column('data', String(30))
)
if jointype == "join1":
poly_union = polymorphic_union({
- 'manager':managers.join(people, people.c.person_id==managers.c.person_id),
+ 'manager':managers.join(people,
+ people.c.person_id==managers.c.person_id),
'person':people.select(people.c.type=='person')
}, None)
elif jointype =="join2":
poly_union = polymorphic_union({
- 'manager':join(people, managers, people.c.person_id==managers.c.person_id),
+ 'manager':join(people, managers,
+ people.c.person_id==managers.c.person_id),
'person':people.select(people.c.type=='person')
}, None)
elif jointype == 'join3':
mapper(Data, data)
if usedata:
- mapper(Person, people, with_polymorphic=('*', poly_union), polymorphic_identity='person', polymorphic_on=people.c.type,
+ mapper(Person, people,
+ with_polymorphic=('*', poly_union),
+ polymorphic_identity='person',
+ polymorphic_on=people.c.type,
properties={
- 'colleagues':relationship(Person, primaryjoin=people.c.colleague_id==people.c.person_id, remote_side=people.c.colleague_id, uselist=True),
+ 'colleagues':relationship(Person,
+ primaryjoin=people.c.colleague_id==
+ people.c.person_id,
+ remote_side=people.c.colleague_id,
+ uselist=True),
'data':relationship(Data, uselist=False)
}
)
else:
- mapper(Person, people, with_polymorphic=('*', poly_union), polymorphic_identity='person', polymorphic_on=people.c.type,
+ mapper(Person, people,
+ with_polymorphic=('*', poly_union),
+ polymorphic_identity='person',
+ polymorphic_on=people.c.type,
properties={
- 'colleagues':relationship(Person, primaryjoin=people.c.colleague_id==people.c.person_id,
+ 'colleagues':relationship(Person,
+ primaryjoin=people.c.colleague_id==people.c.person_id,
remote_side=people.c.colleague_id, uselist=True)
}
)
- mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager')
+ mapper(Manager, managers, inherits=Person,
+ inherit_condition=people.c.person_id==
+ managers.c.person_id,
+ polymorphic_identity='manager')
sess = create_session()
p = Person(name='person1')
p2 = sess.query(Person).get(p2.person_id)
p3 = sess.query(Person).get(p3.person_id)
m = sess.query(Person).get(m.person_id)
- print p, p2, p.colleagues, m.colleagues
assert len(p.colleagues) == 1
assert p.colleagues == [p2]
assert m.colleagues == [p3]
def define_tables(cls, metadata):
global people, engineers, managers, cars
people = Table('people', metadata,
- Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('person_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('name', String(50)))
engineers = Table('engineers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
Column('status', String(30)))
managers = Table('managers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
Column('longer_status', String(70)))
cars = Table('cars', metadata,
- Column('car_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('car_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('owner', Integer, ForeignKey('people.person_id')))
- def testmanytoonepolymorphic(self):
- """in this test, the polymorphic union is between two subclasses, but does not include the base table by itself
- in the union. however, the primaryjoin condition is going to be against the base table, and its a many-to-one
- relationship (unlike the test in polymorph.py) so the column in the base table is explicit. Can the ClauseAdapter
- figure out how to alias the primaryjoin to the polymorphic union ?"""
+ def test_many_to_one_polymorphic(self):
+ """in this test, the polymorphic union is between two subclasses, but
+ does not include the base table by itself in the union. however, the
+ primaryjoin condition is going to be against the base table, and its a
+ many-to-one relationship (unlike the test in polymorph.py) so the
+ column in the base table is explicit. Can the ClauseAdapter figure out
+ how to alias the primaryjoin to the polymorphic union ?"""
# class definitions
class Person(object):
return "Ordinary person %s" % self.name
class Engineer(Person):
def __repr__(self):
- return "Engineer %s, status %s" % (self.name, self.status)
+ return "Engineer %s, status %s" % \
+ (self.name, self.status)
class Manager(Person):
def __repr__(self):
- return "Manager %s, status %s" % (self.name, self.longer_status)
+ return "Manager %s, status %s" % \
+ (self.name, self.longer_status)
class Car(object):
def __init__(self, **kwargs):
for key, value in kwargs.iteritems():
'manager':people.join(managers),
}, "type", 'employee_join')
- person_mapper = mapper(Person, people, with_polymorphic=('*', employee_join), polymorphic_on=employee_join.c.type, polymorphic_identity='person')
- engineer_mapper = mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
- manager_mapper = mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager')
- car_mapper = mapper(Car, cars, properties= {'employee':relationship(person_mapper)})
+ person_mapper = mapper(Person, people,
+ with_polymorphic=('*', employee_join),
+ polymorphic_on=employee_join.c.type,
+ polymorphic_identity='person')
+ engineer_mapper = mapper(Engineer, engineers,
+ inherits=person_mapper,
+ polymorphic_identity='engineer')
+ manager_mapper = mapper(Manager, managers,
+ inherits=person_mapper,
+ polymorphic_identity='manager')
+ car_mapper = mapper(Car, cars,
+ properties= {'employee':
+ relationship(person_mapper)})
session = create_session()
# creating 5 managers named from M1 to E5
for i in range(1,5):
- session.add(Manager(name="M%d" % i,longer_status="YYYYYYYYY"))
+ session.add(Manager(name="M%d" % i,
+ longer_status="YYYYYYYYY"))
# creating 5 engineers named from E1 to E5
for i in range(1,5):
session.add(Engineer(name="E%d" % i,status="X"))
session.flush()
- engineer4 = session.query(Engineer).filter(Engineer.name=="E4").first()
- manager3 = session.query(Manager).filter(Manager.name=="M3").first()
+ engineer4 = session.query(Engineer).\
+ filter(Engineer.name=="E4").first()
+ manager3 = session.query(Manager).\
+ filter(Manager.name=="M3").first()
car1 = Car(employee=engineer4)
session.add(car1)
session.expunge_all()
def go():
- testcar = session.query(Car).options(joinedload('employee')).get(car1.car_id)
+ testcar = session.query(Car).options(
+ joinedload('employee')
+ ).get(car1.car_id)
assert str(testcar.employee) == "Engineer E4, status X"
self.assert_sql_count(testing.db, go, 1)
- print "----------------------------"
car1 = session.query(Car).get(car1.car_id)
- print "----------------------------"
usingGet = session.query(person_mapper).get(car1.owner)
- print "----------------------------"
usingProperty = car1.employee
- print "----------------------------"
- # All print should output the same person (engineer E4)
assert str(engineer4) == "Engineer E4, status X"
- print str(usingGet)
assert str(usingGet) == "Engineer E4, status X"
assert str(usingProperty) == "Engineer E4, status X"
session.expunge_all()
- print "-----------------------------------------------------------------"
# and now for the lightning round, eager !
def go():
- testcar = session.query(Car).options(joinedload('employee')).get(car1.car_id)
+ testcar = session.query(Car).options(
+ joinedload('employee')
+ ).get(car1.car_id)
assert str(testcar.employee) == "Engineer E4, status X"
self.assert_sql_count(testing.db, go, 1)
def define_tables(cls, metadata):
global people, engineers, managers, cars
people = Table('people', metadata,
- Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('person_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('name', String(50)),
Column('type', String(50)))
engineers = Table('engineers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
Column('status', String(30)))
managers = Table('managers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
Column('longer_status', String(70)))
cars = Table('cars', metadata,
- Column('car_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('car_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('owner', Integer, ForeignKey('people.person_id')))
def test_eager_empty(self):
- """test parent object with child relationship to an inheriting mapper, using eager loads,
- works when there are no child objects present"""
+ """test parent object with child relationship to an inheriting mapper,
+ using eager loads, works when there are no child objects present"""
class Person(object):
def __init__(self, **kwargs):
return "Ordinary person %s" % self.name
class Engineer(Person):
def __repr__(self):
- return "Engineer %s, status %s" % (self.name, self.status)
+ return "Engineer %s, status %s" % \
+ (self.name, self.status)
class Manager(Person):
def __repr__(self):
- return "Manager %s, status %s" % (self.name, self.longer_status)
+ return "Manager %s, status %s" % \
+ (self.name, self.longer_status)
class Car(object):
def __init__(self, **kwargs):
for key, value in kwargs.iteritems():
def __repr__(self):
return "Car number %d" % self.car_id
- person_mapper = mapper(Person, people, polymorphic_on=people.c.type, polymorphic_identity='person')
- engineer_mapper = mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
- manager_mapper = mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager')
- car_mapper = mapper(Car, cars, properties= {'manager':relationship(manager_mapper, lazy='joined')})
+ person_mapper = mapper(Person, people,
+ polymorphic_on=people.c.type,
+ polymorphic_identity='person')
+ engineer_mapper = mapper(Engineer, engineers,
+ inherits=person_mapper,
+ polymorphic_identity='engineer')
+ manager_mapper = mapper(Manager, managers,
+ inherits=person_mapper,
+ polymorphic_identity='manager')
+ car_mapper = mapper(Car, cars, properties= {
+ 'manager':relationship(
+ manager_mapper, lazy='joined')})
sess = create_session()
car1 = Car()
assert carlist[1].manager.person_id == car2.manager.person_id
class RelationshipTest6(fixtures.MappedTest):
- """test self-referential relationships on a single joined-table inheritance mapper"""
+ """test self-referential relationships on a single joined-table
+ inheritance mapper"""
+
@classmethod
def define_tables(cls, metadata):
global people, managers, data
people = Table('people', metadata,
- Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('person_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('name', String(50)),
)
managers = Table('managers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
- Column('colleague_id', Integer, ForeignKey('managers.person_id')),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
+ Column('colleague_id', Integer,
+ ForeignKey('managers.person_id')),
Column('status', String(30)),
)
- def testbasic(self):
+ def test_basic(self):
class Person(AttrSettable):
pass
class Manager(Person):
pass
mapper(Person, people)
- # relationship is from people.join(managers) -> people.join(managers). self referential logic
- # needs to be used to figure out the lazy clause, meaning create_lazy_clause must go from parent.mapped_table
- # to parent.mapped_table
- mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id,
+
+ mapper(Manager, managers, inherits=Person,
+ inherit_condition=people.c.person_id==\
+ managers.c.person_id,
properties={
- 'colleague':relationship(Manager, primaryjoin=managers.c.colleague_id==managers.c.person_id, lazy='select', uselist=False)
+ 'colleague':relationship(Manager,
+ primaryjoin=managers.c.colleague_id==\
+ managers.c.person_id,
+ lazy='select', uselist=False)
}
)
def define_tables(cls, metadata):
global people, engineers, managers, cars, offroad_cars
cars = Table('cars', metadata,
- Column('car_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('car_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('name', String(30)))
offroad_cars = Table('offroad_cars', metadata,
- Column('car_id',Integer, ForeignKey('cars.car_id'),nullable=False,primary_key=True))
+ Column('car_id',Integer, ForeignKey('cars.car_id'),
+ nullable=False,primary_key=True))
people = Table('people', metadata,
- Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('car_id', Integer, ForeignKey('cars.car_id'), nullable=False),
+ Column('person_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('car_id', Integer, ForeignKey('cars.car_id'),
+ nullable=False),
Column('name', String(50)))
engineers = Table('engineers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
Column('field', String(30)))
managers = Table('managers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
Column('category', String(70)))
@testing.uses_deprecated("fold_equivalents is deprecated.")
def test_manytoone_lazyload(self):
- """test that lazy load clause to a polymorphic child mapper generates correctly [ticket:493]"""
+ """test that lazy load clause to a polymorphic child mapper generates
+ correctly [ticket:493]"""
class PersistentObject(object):
def __init__(self, **kwargs):
class Engineer(Person):
def __repr__(self):
- return "Engineer %s, field %s" % (self.name, self.field)
+ return "Engineer %s, field %s" % (self.name,
+ self.field)
class Manager(Person):
def __repr__(self):
- return "Manager %s, category %s" % (self.name, self.category)
+ return "Manager %s, category %s" % (self.name,
+ self.category)
class Car(PersistentObject):
def __repr__(self):
- return "Car number %d, name %s" % (self.car_id, self.name)
+ return "Car number %d, name %s" % \
+ (self.car_id, self.name)
class Offraod_Car(Car):
def __repr__(self):
- return "Offroad Car number %d, name %s" % (self.car_id,self.name)
+ return "Offroad Car number %d, name %s" % \
+ (self.car_id,self.name)
employee_join = polymorphic_union(
{
car_join = polymorphic_union(
{
- 'car' : cars.outerjoin(offroad_cars).select(offroad_cars.c.car_id == None, fold_equivalents=True),
+ 'car' : cars.outerjoin(offroad_cars).\
+ select(offroad_cars.c.car_id == None,
+ fold_equivalents=True),
'offroad' : cars.join(offroad_cars)
}, "type", 'car_join')
with_polymorphic=('*', car_join) ,polymorphic_on=car_join.c.type,
polymorphic_identity='car',
)
- offroad_car_mapper = mapper(Offraod_Car, offroad_cars, inherits=car_mapper, polymorphic_identity='offroad')
+ offroad_car_mapper = mapper(Offraod_Car, offroad_cars,
+ inherits=car_mapper, polymorphic_identity='offroad')
person_mapper = mapper(Person, people,
- with_polymorphic=('*', employee_join), polymorphic_on=employee_join.c.type,
+ with_polymorphic=('*', employee_join),
+ polymorphic_on=employee_join.c.type,
polymorphic_identity='person',
properties={
'car':relationship(car_mapper)
})
- engineer_mapper = mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
- manager_mapper = mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager')
+ engineer_mapper = mapper(Engineer, engineers,
+ inherits=person_mapper,
+ polymorphic_identity='engineer')
+ manager_mapper = mapper(Manager, managers,
+ inherits=person_mapper,
+ polymorphic_identity='manager')
session = create_session()
basic_car=Car(name="basic")
car=Car()
else:
car=Offraod_Car()
- session.add(Manager(name="M%d" % i,category="YYYYYYYYY",car=car))
+ session.add(Manager(name="M%d" % i,
+ category="YYYYYYYYY",car=car))
session.add(Engineer(name="E%d" % i,field="X",car=car))
session.flush()
session.expunge_all()
def define_tables(cls, metadata):
global taggable, users
taggable = Table('taggable', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('type', String(30)),
Column('owner_id', Integer, ForeignKey('taggable.id')),
)
users = Table ('users', metadata,
- Column('id', Integer, ForeignKey('taggable.id'), primary_key=True),
+ Column('id', Integer, ForeignKey('taggable.id'),
+ primary_key=True),
Column('data', String(50)),
)
class User(Taggable):
pass
- mapper( Taggable, taggable, polymorphic_on=taggable.c.type, polymorphic_identity='taggable', properties = {
+ mapper( Taggable, taggable,
+ polymorphic_on=taggable.c.type,
+ polymorphic_identity='taggable',
+ properties = {
'owner' : relationship (User,
- primaryjoin=taggable.c.owner_id ==taggable.c.id,
- remote_side=taggable.c.id
- ),
+ primaryjoin=taggable.c.owner_id ==taggable.c.id,
+ remote_side=taggable.c.id
+ ),
})
- mapper(User, users, inherits=Taggable, polymorphic_identity='user',
+ mapper(User, users, inherits=Taggable,
+ polymorphic_identity='user',
inherit_condition=users.c.id == taggable.c.id,
)
metadata = MetaData(testing.db)
# table definitions
status = Table('status', metadata,
- Column('status_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('status_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('name', String(20)))
people = Table('people', metadata,
- Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('status_id', Integer, ForeignKey('status.status_id'), nullable=False),
+ Column('person_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('status_id', Integer, ForeignKey('status.status_id'),
+ nullable=False),
Column('name', String(50)))
engineers = Table('engineers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
Column('field', String(30)))
managers = Table('managers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
Column('category', String(70)))
cars = Table('cars', metadata,
- Column('car_id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('status_id', Integer, ForeignKey('status.status_id'), nullable=False),
- Column('owner', Integer, ForeignKey('people.person_id'), nullable=False))
+ Column('car_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('status_id', Integer, ForeignKey('status.status_id'),
+ nullable=False),
+ Column('owner', Integer, ForeignKey('people.person_id'),
+ nullable=False))
metadata.create_all()
return "Ordinary person %s" % self.name
class Engineer(Person):
def __repr__(self):
- return "Engineer %s, field %s, status %s" % (self.name, self.field, self.status)
+ return "Engineer %s, field %s, status %s" % (
+ self.name, self.field, self.status)
class Manager(Person):
def __repr__(self):
- return "Manager %s, category %s, status %s" % (self.name, self.category, self.status)
+ return "Manager %s, category %s, status %s" % (
+ self.name, self.category, self.status)
class Car(PersistentObject):
def __repr__(self):
return "Car number %d" % self.car_id
status_mapper = mapper(Status, status)
person_mapper = mapper(Person, people,
- with_polymorphic=('*', employee_join), polymorphic_on=employee_join.c.type,
- polymorphic_identity='person', properties={'status':relationship(status_mapper)})
- engineer_mapper = mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
- manager_mapper = mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager')
- car_mapper = mapper(Car, cars, properties= {'employee':relationship(person_mapper), 'status':relationship(status_mapper)})
+ with_polymorphic=('*', employee_join),
+ polymorphic_on=employee_join.c.type,
+ polymorphic_identity='person',
+ properties={'status':relationship(status_mapper)})
+ engineer_mapper = mapper(Engineer, engineers,
+ inherits=person_mapper,
+ polymorphic_identity='engineer')
+ manager_mapper = mapper(Manager, managers,
+ inherits=person_mapper,
+ polymorphic_identity='manager')
+ car_mapper = mapper(Car, cars, properties= {
+ 'employee':relationship(person_mapper),
+ 'status':relationship(status_mapper)})
session = create_session()
session.add(dead)
session.flush()
- # TODO: we haven't created assertions for all the data combinations created here
+ # TODO: we haven't created assertions for all
+ # the data combinations created here
- # creating 5 managers named from M1 to M5 and 5 engineers named from E1 to E5
+ # creating 5 managers named from M1 to M5
+ # and 5 engineers named from E1 to E5
# M4, M5, E4 and E5 are dead
for i in range(1,5):
if i<4:
st=active
else:
st=dead
- session.add(Manager(name="M%d" % i,category="YYYYYYYYY",status=st))
+ session.add(Manager(name="M%d" % i,
+ category="YYYYYYYYY",status=st))
session.add(Engineer(name="E%d" % i,field="X",status=st))
session.flush()
# get E4
- engineer4 = session.query(engineer_mapper).filter_by(name="E4").one()
+ engineer4 = session.query(engineer_mapper).\
+ filter_by(name="E4").one()
# create 2 cars for E4, one active and one dead
car1 = Car(employee=engineer4,status=active)
e = exists([Car.owner], Car.owner==employee_join.c.person_id)
Query(Person)._adapt_clause(employee_join, False, False)
- r = session.query(Person).filter(Person.name.like('%2')).join('status').filter_by(name="active").order_by(Person.person_id)
- eq_(str(list(r)), "[Manager M2, category YYYYYYYYY, status Status active, Engineer E2, field X, status Status active]")
- r = session.query(Engineer).join('status').filter(Person.name.in_(['E2', 'E3', 'E4', 'M4', 'M2', 'M1']) & (status.c.name=="active")).order_by(Person.name)
- eq_(str(list(r)), "[Engineer E2, field X, status Status active, Engineer E3, field X, status Status active]")
-
- r = session.query(Person).filter(exists([1], Car.owner==Person.person_id))
+ r = session.query(Person).filter(Person.name.like('%2')).\
+ join('status').\
+ filter_by(name="active").\
+ order_by(Person.person_id)
+ eq_(str(list(r)), "[Manager M2, category YYYYYYYYY, status "
+ "Status active, Engineer E2, field X, "
+ "status Status active]")
+ r = session.query(Engineer).join('status').\
+ filter(Person.name.in_(
+ ['E2', 'E3', 'E4', 'M4', 'M2', 'M1']) &
+ (status.c.name=="active")).order_by(Person.name)
+ eq_(str(list(r)), "[Engineer E2, field X, status Status "
+ "active, Engineer E3, field X, status "
+ "Status active]")
+
+ r = session.query(Person).filter(exists([1],
+ Car.owner==Person.person_id))
eq_(str(list(r)), "[Engineer E4, field X, status Status dead]")
class MultiLevelTest(fixtures.MappedTest):
global table_Employee, table_Engineer, table_Manager
table_Employee = Table( 'Employee', metadata,
Column( 'name', type_= String(100), ),
- Column( 'id', primary_key= True, type_= Integer, test_needs_autoincrement=True),
+ Column( 'id', primary_key= True, type_= Integer,
+ test_needs_autoincrement=True),
Column( 'atype', type_= String(100), ),
)
table_Engineer = Table( 'Engineer', metadata,
Column( 'machine', type_= String(100), ),
- Column( 'id', Integer, ForeignKey( 'Employee.id', ), primary_key= True, ),
+ Column( 'id', Integer, ForeignKey( 'Employee.id', ),
+ primary_key= True),
)
table_Manager = Table( 'Manager', metadata,
Column( 'duties', type_= String(100), ),
- Column( 'id', Integer, ForeignKey( 'Engineer.id', ), primary_key= True, ),
+ Column( 'id', Integer, ForeignKey( 'Engineer.id', ),
+ primary_key= True, ),
)
+
def test_threelevels(self):
class Employee( object):
def set( me, **kargs):
for k,v in kargs.iteritems(): setattr( me, k, v)
return me
- def __str__(me): return str(me.__class__.__name__)+':'+str(me.name)
+ def __str__(me):
+ return str(me.__class__.__name__)+':'+str(me.name)
__repr__ = __str__
- class Engineer( Employee): pass
- class Manager( Engineer): pass
+ class Engineer(Employee):
+ pass
+ class Manager(Engineer):
+ pass
pu_Employee = polymorphic_union( {
- 'Manager': table_Employee.join( table_Engineer).join( table_Manager),
- 'Engineer': select([table_Employee, table_Engineer.c.machine], table_Employee.c.atype == 'Engineer', from_obj=[table_Employee.join(table_Engineer)]),
- 'Employee': table_Employee.select( table_Employee.c.atype == 'Employee'),
+ 'Manager': table_Employee.join(
+ table_Engineer).join( table_Manager),
+ 'Engineer': select([table_Employee,
+ table_Engineer.c.machine],
+ table_Employee.c.atype == 'Engineer',
+ from_obj=[
+ table_Employee.join(table_Engineer)]),
+ 'Employee': table_Employee.select(
+ table_Employee.c.atype == 'Employee'),
}, None, 'pu_employee', )
-# pu_Employee = polymorphic_union( {
-# 'Manager': table_Employee.join( table_Engineer).join( table_Manager),
-# 'Engineer': table_Employee.join(table_Engineer).select(table_Employee.c.atype == 'Engineer'),
-# 'Employee': table_Employee.select( table_Employee.c.atype == 'Employee'),
-# }, None, 'pu_employee', )
-
mapper_Employee = mapper( Employee, table_Employee,
polymorphic_identity= 'Employee',
polymorphic_on= pu_Employee.c.atype,
)
pu_Engineer = polymorphic_union( {
- 'Manager': table_Employee.join( table_Engineer).join( table_Manager),
- 'Engineer': select([table_Employee, table_Engineer.c.machine], table_Employee.c.atype == 'Engineer', from_obj=[table_Employee.join(table_Engineer)]),
+ 'Manager': table_Employee.join( table_Engineer).
+ join( table_Manager),
+ 'Engineer': select([table_Employee,
+ table_Engineer.c.machine],
+ table_Employee.c.atype == 'Engineer',
+ from_obj=[
+ table_Employee.join(table_Engineer)
+ ]),
}, None, 'pu_engineer', )
mapper_Engineer = mapper( Engineer, table_Engineer,
- inherit_condition= table_Engineer.c.id == table_Employee.c.id,
+ inherit_condition= table_Engineer.c.id == \
+ table_Employee.c.id,
inherits= mapper_Employee,
polymorphic_identity= 'Engineer',
polymorphic_on= pu_Engineer.c.atype,
)
mapper_Manager = mapper( Manager, table_Manager,
- inherit_condition= table_Manager.c.id == table_Engineer.c.id,
+ inherit_condition= table_Manager.c.id == \
+ table_Engineer.c.id,
inherits= mapper_Engineer,
polymorphic_identity= 'Manager',
)
a = Employee().set( name= 'one')
b = Engineer().set( egn= 'two', machine= 'any')
- c = Manager().set( name= 'head', machine= 'fast', duties= 'many')
+ c = Manager().set( name= 'head', machine= 'fast',
+ duties= 'many')
session = create_session()
session.add(a)
class ManyToManyPolyTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
- global base_item_table, item_table, base_item_collection_table, collection_table
+ global base_item_table, item_table, base_item_collection_table, \
+ collection_table
base_item_table = Table(
'base_item', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('child_name', String(255), default=None))
item_table = Table(
'item', metadata,
- Column('id', Integer, ForeignKey('base_item.id'), primary_key=True),
- Column('dummy', Integer, default=0)) # Dummy column to avoid weird insert problems
+ Column('id', Integer, ForeignKey('base_item.id'),
+ primary_key=True),
+ Column('dummy', Integer, default=0))
base_item_collection_table = Table(
'base_item_collection', metadata,
collection_table = Table(
'collection', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('name', Unicode(255)))
def test_pjoin_compile(self):
- """test that remote_side columns in the secondary join table arent attempted to be
- matched to the target polymorphic selectable"""
+ """test that remote_side columns in the secondary join table
+ arent attempted to be matched to the target polymorphic
+ selectable"""
class BaseItem(object): pass
class Item(BaseItem): pass
class Collection(object): pass
item_join = polymorphic_union( {
- 'BaseItem':base_item_table.select(base_item_table.c.child_name=='BaseItem'),
+ 'BaseItem':base_item_table.select(
+ base_item_table.c.child_name=='BaseItem'),
'Item':base_item_table.join(item_table),
}, None, 'item_join')
with_polymorphic=('*', item_join),
polymorphic_on=base_item_table.c.child_name,
polymorphic_identity='BaseItem',
- properties=dict(collections=relationship(Collection, secondary=base_item_collection_table, backref="items")))
+ properties=dict(collections=relationship(Collection,
+ secondary=base_item_collection_table,
+ backref="items")))
mapper(
Item, item_table,
def define_tables(cls, metadata):
global t1, t2
t1 = Table('t1', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('type', String(30), nullable=False),
Column('data', String(30)))
# note that the primary key column in t2 is named differently
Column('t2data', String(30)))
def test_custompk(self):
- """test that the primary_key attribute is propagated to the polymorphic mapper"""
+ """test that the primary_key attribute is propagated to the
+ polymorphic mapper"""
class T1(object):pass
class T2(T1):pass
d['t2'] = t1.join(t2)
pjoin = polymorphic_union(d, None, 'pjoin')
- mapper(T1, t1, polymorphic_on=t1.c.type, polymorphic_identity='t1', with_polymorphic=('*', pjoin), primary_key=[pjoin.c.id])
+ mapper(T1, t1, polymorphic_on=t1.c.type,
+ polymorphic_identity='t1',
+ with_polymorphic=('*', pjoin),
+ primary_key=[pjoin.c.id])
mapper(T2, t2, inherits=T1, polymorphic_identity='t2')
- print [str(c) for c in class_mapper(T1).primary_key]
ot1 = T1()
ot2 = T2()
sess = create_session()
sess.flush()
sess.expunge_all()
- # query using get(), using only one value. this requires the select_table mapper
+ # query using get(), using only one value.
+ # this requires the select_table mapper
# has the same single-col primary key.
assert sess.query(T1).get(ot1.id).id == ot1.id
sess.flush()
def test_pk_collapses(self):
- """test that a composite primary key attribute formed by a join is "collapsed" into its
- minimal columns"""
+ """test that a composite primary key attribute formed by a join
+ is "collapsed" into its minimal columns"""
class T1(object):pass
class T2(T1):pass
d['t2'] = t1.join(t2)
pjoin = polymorphic_union(d, None, 'pjoin')
- mapper(T1, t1, polymorphic_on=t1.c.type, polymorphic_identity='t1', with_polymorphic=('*', pjoin))
+ mapper(T1, t1, polymorphic_on=t1.c.type,
+ polymorphic_identity='t1',
+ with_polymorphic=('*', pjoin))
mapper(T2, t2, inherits=T1, polymorphic_identity='t2')
assert len(class_mapper(T1).primary_key) == 1
- print [str(c) for c in class_mapper(T1).primary_key]
ot1 = T1()
ot2 = T2()
sess = create_session()
sess.flush()
sess.expunge_all()
- # query using get(), using only one value. this requires the select_table mapper
+ # query using get(), using only one value. this requires the
+ # select_table mapper
# has the same single-col primary key.
assert sess.query(T1).get(ot1.id).id == ot1.id
global people, employees, tags, peopleTags
people = Table('people', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('_type', String(30), nullable=False),
)
employees = Table('employees', metadata,
- Column('id', Integer, ForeignKey('people.id'),primary_key=True),
+ Column('id', Integer, ForeignKey('people.id'),
+ primary_key=True),
)
tags = Table('tags', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('label', String(50), nullable=False),
)
peopleTags = Table('peopleTags', metadata,
- Column('person_id', Integer,ForeignKey('people.id')),
- Column('tag_id', Integer,ForeignKey('tags.id')),
+ Column('person_id', Integer,
+ ForeignKey('people.id')),
+ Column('tag_id', Integer,
+ ForeignKey('tags.id')),
)
def test_basic(self):
- """test that Query uses the full set of mapper._eager_loaders when generating SQL"""
+ """test that Query uses the full set of mapper._eager_loaders
+ when generating SQL"""
class Person(fixtures.ComparableEntity):
pass
def __init__(self, label):
self.label = label
- mapper(Person, people, polymorphic_on=people.c._type,polymorphic_identity='person', properties={
- 'tags': relationship(Tag, secondary=peopleTags,backref='people', lazy='joined')
+ mapper(Person, people, polymorphic_on=people.c._type,
+ polymorphic_identity='person', properties={
+ 'tags': relationship(Tag,
+ secondary=peopleTags,
+ backref='people', lazy='joined')
})
- mapper(Employee, employees, inherits=Person,polymorphic_identity='employee')
+ mapper(Employee, employees, inherits=Person,
+ polymorphic_identity='employee')
mapper(Tag, tags)
session = create_session()
session.expunge_all()
# query from Employee with limit, query needs to apply eager limiting subquery
- instance = session.query(Employee).filter_by(id=1).limit(1).first()
+ instance = session.query(Employee).\
+ filter_by(id=1).limit(1).first()
assert len(instance.tags) == 2
class MissingPolymorphicOnTest(fixtures.MappedTest):
self._roundtrip()
def test_joined_subclass_to_superclass(self):
- people, users, dudes = self.tables.people, self.tables.users, self.tables.dudes
- Person, User, Dude = self.classes.Person, self.classes.User, self.classes.Dude
+ people, users, dudes = self.tables.people, self.tables.users, \
+ self.tables.dudes
+ Person, User, Dude = self.classes.Person, self.classes.User, \
+ self.classes.Dude
mapper(Person, people,
polymorphic_on=people.c.type,
global companies, people, engineers, managers, boss
companies = Table('companies', metadata,
- Column('company_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('company_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('name', String(50)))
people = Table('people', metadata,
- Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('company_id', Integer, ForeignKey('companies.company_id'), nullable=False),
+ Column('person_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('company_id', Integer, ForeignKey('companies.company_id'),
+ nullable=False),
Column('name', String(50)),
Column('type', String(30)))
engineers = Table('engineers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
Column('status', String(30)),
Column('engineer_name', String(50)),
Column('primary_language', String(50)),
)
managers = Table('managers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
Column('status', String(30)),
Column('manager_name', String(50))
)
boss = Table('boss', metadata,
- Column('boss_id', Integer, ForeignKey('managers.person_id'), primary_key=True),
+ Column('boss_id', Integer, ForeignKey('managers.person_id'),
+ primary_key=True),
Column('golf_swing', String(30)),
)
polymorphic_on=person_join.c.type,
polymorphic_identity='person')
- mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
- mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager')
+ mapper(Engineer, engineers, inherits=person_mapper,
+ polymorphic_identity='engineer')
+ mapper(Manager, managers, inherits=person_mapper,
+ polymorphic_identity='manager')
mapper(Company, companies, properties={
'employees': relationship(Person,
backref='company',
class RoundTripTest(PolymorphTest):
pass
-def _generate_round_trip_test(include_base, lazy_relationship, redefine_colprop, with_polymorphic):
+def _generate_round_trip_test(include_base, lazy_relationship,
+ redefine_colprop, with_polymorphic):
"""generates a round trip test.
-
- include_base - whether or not to include the base 'person' type in the union.
- lazy_relationship - whether or not the Company relationship to People is lazy or eager.
- redefine_colprop - if we redefine the 'name' column to be 'people_name' on the base Person class
+
+ include_base - whether or not to include the base 'person' type in
+ the union.
+
+ lazy_relationship - whether or not the Company relationship to
+ People is lazy or eager.
+
+ redefine_colprop - if we redefine the 'name' column to be
+ 'people_name' on the base Person class
+
use_literal_join - primary join condition is explicitly specified
"""
def test_roundtrip(self):
person_with_polymorphic = ['*', person_join]
manager_with_polymorphic = ['*', manager_join]
elif with_polymorphic == 'joins':
- person_join = people.outerjoin(engineers).outerjoin(managers).outerjoin(boss)
+ person_join = people.outerjoin(engineers).outerjoin(managers).\
+ outerjoin(boss)
manager_join = people.join(managers).outerjoin(boss)
person_with_polymorphic = ['*', person_join]
manager_with_polymorphic = ['*', manager_join]
if redefine_colprop:
person_mapper = mapper(Person, people,
- with_polymorphic=person_with_polymorphic,
- polymorphic_on=people.c.type,
- polymorphic_identity='person',
- properties= {'person_name':people.c.name})
+ with_polymorphic=person_with_polymorphic,
+ polymorphic_on=people.c.type,
+ polymorphic_identity='person',
+ properties= {'person_name':people.c.name})
else:
person_mapper = mapper(Person, people,
- with_polymorphic=person_with_polymorphic,
- polymorphic_on=people.c.type,
- polymorphic_identity='person')
+ with_polymorphic=person_with_polymorphic,
+ polymorphic_on=people.c.type,
+ polymorphic_identity='person')
- mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
- mapper(Manager, managers, inherits=person_mapper, with_polymorphic=manager_with_polymorphic, polymorphic_identity='manager')
+ mapper(Engineer, engineers, inherits=person_mapper,
+ polymorphic_identity='engineer')
+ mapper(Manager, managers, inherits=person_mapper,
+ with_polymorphic=manager_with_polymorphic,
+ polymorphic_identity='manager')
mapper(Boss, boss, inherits=Manager, polymorphic_identity='boss')
person_attribute_name = 'name'
employees = [
- Manager(status='AAB', manager_name='manager1', **{person_attribute_name:'pointy haired boss'}),
- Engineer(status='BBA', engineer_name='engineer1', primary_language='java', **{person_attribute_name:'dilbert'}),
+ Manager(status='AAB', manager_name='manager1',
+ **{person_attribute_name:'pointy haired boss'}),
+ Engineer(status='BBA', engineer_name='engineer1',
+ primary_language='java',
+ **{person_attribute_name:'dilbert'}),
]
if include_base:
employees.append(Person(**{person_attribute_name:'joesmith'}))
employees += [
- Engineer(status='CGG', engineer_name='engineer2', primary_language='python', **{person_attribute_name:'wally'}),
- Manager(status='ABA', manager_name='manager2', **{person_attribute_name:'jsmith'})
+ Engineer(status='CGG', engineer_name='engineer2',
+ primary_language='python',
+ **{person_attribute_name:'wally'}),
+ Manager(status='ABA', manager_name='manager2',
+ **{person_attribute_name:'jsmith'})
]
pointy = employees[0]
eq_(session.query(Person).get(dilbert.person_id), dilbert)
session.expunge_all()
- eq_(session.query(Person).filter(Person.person_id==dilbert.person_id).one(), dilbert)
+ eq_(session.query(Person).filter(
+ Person.person_id==dilbert.person_id).one(),
+ dilbert)
session.expunge_all()
def go():
else:
self.assert_sql_count(testing.db, go, 6)
- # test selecting from the query, using the base mapped table (people) as the selection criterion.
- # in the case of the polymorphic Person query, the "people" selectable should be adapted to be "person_join"
+ # test selecting from the query, using the base
+ # mapped table (people) as the selection criterion.
+ # in the case of the polymorphic Person query,
+ # the "people" selectable should be adapted to be "person_join"
eq_(
- session.query(Person).filter(getattr(Person, person_attribute_name)=='dilbert').first(),
+ session.query(Person).filter(
+ getattr(Person, person_attribute_name)=='dilbert'
+ ).first(),
dilbert
)
- assert session.query(Person).filter(getattr(Person, person_attribute_name)=='dilbert').first().person_id
+ assert session.query(Person).filter(
+ getattr(Person, person_attribute_name)=='dilbert'
+ ).first().person_id
eq_(
- session.query(Engineer).filter(getattr(Person, person_attribute_name)=='dilbert').first(),
+ session.query(Engineer).filter(
+ getattr(Person, person_attribute_name)=='dilbert'
+ ).first(),
dilbert
)
- # test selecting from the query, joining against an alias of the base "people" table. test that
- # the "palias" alias does *not* get sucked up into the "person_join" conversion.
+ # test selecting from the query, joining against
+ # an alias of the base "people" table. test that
+ # the "palias" alias does *not* get sucked up
+ # into the "person_join" conversion.
palias = people.alias("palias")
dilbert = session.query(Person).get(dilbert.person_id)
- assert dilbert is session.query(Person).filter((palias.c.name=='dilbert') & (palias.c.person_id==Person.person_id)).first()
- assert dilbert is session.query(Engineer).filter((palias.c.name=='dilbert') & (palias.c.person_id==Person.person_id)).first()
- assert dilbert is session.query(Person).filter((Engineer.engineer_name=="engineer1") & (engineers.c.person_id==people.c.person_id)).first()
- assert dilbert is session.query(Engineer).filter(Engineer.engineer_name=="engineer1")[0]
-
- dilbert.engineer_name = 'hes dibert!'
+ assert dilbert is session.query(Person).filter(
+ (palias.c.name=='dilbert') & \
+ (palias.c.person_id==Person.person_id)).first()
+ assert dilbert is session.query(Engineer).filter(
+ (palias.c.name=='dilbert') & \
+ (palias.c.person_id==Person.person_id)).first()
+ assert dilbert is session.query(Person).filter(
+ (Engineer.engineer_name=="engineer1") & \
+ (engineers.c.person_id==people.c.person_id)
+ ).first()
+ assert dilbert is session.query(Engineer).\
+ filter(Engineer.engineer_name=="engineer1")[0]
session.flush()
session.expunge_all()
def go():
- session.query(Person).filter(getattr(Person, person_attribute_name)=='dilbert').first()
+ session.query(Person).filter(getattr(Person,
+ person_attribute_name)=='dilbert').first()
self.assert_sql_count(testing.db, go, 1)
session.expunge_all()
- dilbert = session.query(Person).filter(getattr(Person, person_attribute_name)=='dilbert').first()
+ dilbert = session.query(Person).filter(getattr(Person,
+ person_attribute_name)=='dilbert').first()
def go():
- # assert that only primary table is queried for already-present-in-session
- d = session.query(Person).filter(getattr(Person, person_attribute_name)=='dilbert').first()
+ # assert that only primary table is queried for
+ # already-present-in-session
+ d = session.query(Person).filter(getattr(Person,
+ person_attribute_name)=='dilbert').first()
self.assert_sql_count(testing.db, go, 1)
# test standalone orphans
- daboss = Boss(status='BBB', manager_name='boss', golf_swing='fore', **{person_attribute_name:'daboss'})
+ daboss = Boss(status='BBB',
+ manager_name='boss',
+ golf_swing='fore',
+ **{person_attribute_name:'daboss'})
session.add(daboss)
assert_raises(sa_exc.DBAPIError, session.flush)
c = session.query(Company).first()
daboss.company = c
- manager_list = [e for e in c.employees if isinstance(e, Manager)]
+ manager_list = [e for e in c.employees
+ if isinstance(e, Manager)]
session.flush()
session.expunge_all()
- eq_(session.query(Manager).order_by(Manager.person_id).all(), manager_list)
+ eq_(session.query(Manager).order_by(Manager.person_id).all(),
+ manager_list)
c = session.query(Company).first()
session.delete(c)
for with_polymorphic in ['unions', 'joins', 'auto', 'none']:
if with_polymorphic == 'unions':
for include_base in [True, False]:
- _generate_round_trip_test(include_base, lazy_relationship, redefine_colprop, with_polymorphic)
+ _generate_round_trip_test(include_base,
+ lazy_relationship,
+ redefine_colprop, with_polymorphic)
else:
- _generate_round_trip_test(False, lazy_relationship, redefine_colprop, with_polymorphic)
+ _generate_round_trip_test(False,
+ lazy_relationship,
+ redefine_colprop, with_polymorphic)
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from sqlalchemy.orm import interfaces
+from sqlalchemy import Integer, String, ForeignKey, func, desc, and_
+from sqlalchemy.orm import interfaces, relationship, mapper, \
+ clear_mappers, create_session, joinedload, joinedload_all, \
+ subqueryload, subqueryload_all, polymorphic_union, aliased,\
+ class_mapper
from sqlalchemy import exc as sa_exc
from sqlalchemy.engine import default
count = {'':14, 'Polymorphic':9}.get(select_type, 10)
self.assert_sql_count(testing.db, go, count)
- def test_primary_eager_aliasing(self):
+ def test_primary_eager_aliasing_one(self):
# For both joinedload() and subqueryload(), if the original q is
# not loading the subclass table, the joinedload doesn't happen.
count = {'':6, 'Polymorphic':3}.get(select_type, 4)
self.assert_sql_count(testing.db, go, count)
+ def test_primary_eager_aliasing_two(self):
sess = create_session()
def go():
eq_(sess.query(Person)
count = {'':14, 'Polymorphic':7}.get(select_type, 8)
self.assert_sql_count(testing.db, go, count)
+ def test_primary_eager_aliasing_three(self):
+
# assert the JOINs don't over JOIN
sess = create_session()
.subquery().count().scalar(),
2)
- def test_get(self):
+ def test_get_one(self):
"""
For all mappers, ensure the primary key has been calculated as
just the "person_id" column.
sess = create_session()
eq_(sess.query(Person).get(e1.person_id),
Engineer(name="dilbert", primary_language="java"))
+
+ def test_get_two(self):
+ sess = create_session()
eq_(sess.query(Engineer).get(e1.person_id),
Engineer(name="dilbert", primary_language="java"))
+
+ def test_get_three(self):
+ sess = create_session()
eq_(sess.query(Manager).get(b1.person_id),
Boss(name="pointy haired boss", golf_swing="fore"))
)
])
- def test_filter_on_subclass(self):
+ def test_filter_on_subclass_one(self):
sess = create_session()
eq_(sess.query(Engineer).all()[0], Engineer(name="dilbert"))
+
+ def test_filter_on_subclass_two(self):
+ sess = create_session()
eq_(sess.query(Engineer).first(), Engineer(name="dilbert"))
+
+ def test_filter_on_subclass_three(self):
+ sess = create_session()
eq_(sess.query(Engineer)
.filter(Engineer.person_id == e1.person_id).first(),
Engineer(name="dilbert"))
+
+ def test_filter_on_subclass_four(self):
+ sess = create_session()
eq_(sess.query(Manager)
.filter(Manager.person_id == m1.person_id).one(),
Manager(name="dogbert"))
+
+ def test_filter_on_subclass_five(self):
+ sess = create_session()
eq_(sess.query(Manager)
.filter(Manager.person_id == b1.person_id).one(),
Boss(name="pointy haired boss"))
+
+ def test_filter_on_subclass_six(self):
+ sess = create_session()
eq_(sess.query(Boss)
.filter(Boss.person_id == b1.person_id).one(),
Boss(name="pointy haired boss"))
- def test_join_from_polymorphic(self):
+ def test_join_from_polymorphic_nonaliased_one(self):
sess = create_session()
- for x in (True, False):
- eq_(sess.query(Person)
- .join('paperwork', aliased=x)
- .filter(Paperwork.description.like('%review%')).all(),
- [b1, m1])
- eq_(sess.query(Person)
- .join('paperwork', aliased=x)
- .filter(Paperwork.description.like('%#2%')).all(),
- [e1, m1])
- eq_(sess.query(Engineer)
- .join('paperwork', aliased=x)
- .filter(Paperwork.description.like('%#2%')).all(),
- [e1])
- eq_(sess.query(Person)
- .join('paperwork', aliased=x)
- .filter(Person.name.like('%dog%'))
- .filter(Paperwork.description.like('%#2%')).all(),
- [m1])
+ eq_(sess.query(Person)
+ .join('paperwork', aliased=False)
+ .filter(Paperwork.description.like('%review%')).all(),
+ [b1, m1])
- def test_join_from_with_polymorphic(self):
+ def test_join_from_polymorphic_nonaliased_two(self):
sess = create_session()
- for aliased in (True, False):
- sess.expunge_all()
- eq_(sess.query(Person)
- .with_polymorphic(Manager)
- .join('paperwork', aliased=aliased)
- .filter(Paperwork.description.like('%review%')).all(),
- [b1, m1])
- sess.expunge_all()
- eq_(sess.query(Person)
- .with_polymorphic([Manager, Engineer])
- .join('paperwork', aliased=aliased)
- .filter(Paperwork.description.like('%#2%')).all(),
- [e1, m1])
- sess.expunge_all()
- eq_(sess.query(Person)
- .with_polymorphic([Manager, Engineer])
- .join('paperwork', aliased=aliased)
- .filter(Person.name.like('%dog%'))
- .filter(Paperwork.description.like('%#2%')).all(),
- [m1])
+ eq_(sess.query(Person)
+ .join('paperwork', aliased=False)
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [e1, m1])
+
+ def test_join_from_polymorphic_nonaliased_three(self):
+ sess = create_session()
+ eq_(sess.query(Engineer)
+ .join('paperwork', aliased=False)
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [e1])
+
+ def test_join_from_polymorphic_nonaliased_four(self):
+ sess = create_session()
+ eq_(sess.query(Person)
+ .join('paperwork', aliased=False)
+ .filter(Person.name.like('%dog%'))
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [m1])
+
+ def test_join_from_polymorphic_aliased_one(self):
+ sess = create_session()
+ eq_(sess.query(Person)
+ .join('paperwork', aliased=True)
+ .filter(Paperwork.description.like('%review%')).all(),
+ [b1, m1])
+
+ def test_join_from_polymorphic_aliased_two(self):
+ sess = create_session()
+ eq_(sess.query(Person)
+ .join('paperwork', aliased=True)
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [e1, m1])
+
+ def test_join_from_polymorphic_aliased_three(self):
+ sess = create_session()
+ eq_(sess.query(Engineer)
+ .join('paperwork', aliased=True)
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [e1])
+
+ def test_join_from_polymorphic_aliased_four(self):
+ sess = create_session()
+ eq_(sess.query(Person)
+ .join('paperwork', aliased=True)
+ .filter(Person.name.like('%dog%'))
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [m1])
+
+ def test_join_from_with_polymorphic_nonaliased_one(self):
+ sess = create_session()
+ eq_(sess.query(Person)
+ .with_polymorphic(Manager)
+ .join('paperwork')
+ .filter(Paperwork.description.like('%review%')).all(),
+ [b1, m1])
+
+ def test_join_from_with_polymorphic_nonaliased_two(self):
+ sess = create_session()
+ eq_(sess.query(Person)
+ .with_polymorphic([Manager, Engineer])
+ .join('paperwork')
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [e1, m1])
+
+ def test_join_from_with_polymorphic_nonaliased_three(self):
+ sess = create_session()
+ eq_(sess.query(Person)
+ .with_polymorphic([Manager, Engineer])
+ .join('paperwork')
+ .filter(Person.name.like('%dog%'))
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [m1])
+
+
+ def test_join_from_with_polymorphic_aliased_one(self):
+ sess = create_session()
+ eq_(sess.query(Person)
+ .with_polymorphic(Manager)
+ .join('paperwork', aliased=True)
+ .filter(Paperwork.description.like('%review%')).all(),
+ [b1, m1])
- def test_join_to_polymorphic(self):
+ def test_join_from_with_polymorphic_aliased_two(self):
+ sess = create_session()
+ eq_(sess.query(Person)
+ .with_polymorphic([Manager, Engineer])
+ .join('paperwork', aliased=True)
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [e1, m1])
+
+ def test_join_from_with_polymorphic_aliased_three(self):
+ sess = create_session()
+ eq_(sess.query(Person)
+ .with_polymorphic([Manager, Engineer])
+ .join('paperwork', aliased=True)
+ .filter(Person.name.like('%dog%'))
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [m1])
+
+ def test_join_to_polymorphic_nonaliased(self):
sess = create_session()
eq_(sess.query(Company)
.join('employees')
.filter(Person.name == 'vlad').one(),
c2)
+
+ def test_join_to_polymorphic_aliased(self):
+ sess = create_session()
eq_(sess.query(Company)
.join('employees', aliased=True)
.filter(Person.name == 'vlad').one(),
c2)
- def test_polymorphic_any(self):
+ def test_polymorphic_any_one(self):
sess = create_session()
any_ = Company.employees.any(Person.name == 'vlad')
eq_(sess.query(Company).filter(any_).all(), [c2])
+ def test_polymorphic_any_two(self):
+ sess = create_session()
# test that the aliasing on "Person" does not bleed into the
# EXISTS clause generated by any()
any_ = Company.employees.any(Person.name == 'wally')
.filter(any_).all(),
[c1])
+ def test_polymorphic_any_three(self):
+ sess = create_session()
any_ = Company.employees.any(Person.name == 'vlad')
eq_(sess.query(Company)
.join(Company.employees, aliased=True)
.filter(any_).all(),
[])
+ def test_polymorphic_any_four(self):
+ sess = create_session()
any_ = Company.employees.of_type(Engineer).any(
Engineer.primary_language == 'cobol')
eq_(sess.query(Company).filter(any_).one(), c2)
+ def test_polymorphic_any_five(self):
+ sess = create_session()
calias = aliased(Company)
any_ = calias.employees.of_type(Engineer).any(
Engineer.primary_language == 'cobol')
eq_(sess.query(calias).filter(any_).one(), c2)
+ def test_polymorphic_any_six(self):
+ sess = create_session()
any_ = Company.employees.of_type(Boss).any(
Boss.golf_swing == 'fore')
eq_(sess.query(Company).filter(any_).one(), c1)
+ def test_polymorphic_any_seven(self):
+ sess = create_session()
any_ = Company.employees.of_type(Boss).any(
Manager.manager_name == 'pointy')
eq_(sess.query(Company).filter(any_).one(), c1)
+ def test_polymorphic_any_eight(self):
+ sess = create_session()
if select_type != '':
any_ = Engineer.machines.any(
Machine.name == "Commodore 64")
eq_(sess.query(Person).filter(any_).all(), [e2, e3])
+ def test_polymorphic_any_nine(self):
+ sess = create_session()
any_ = Person.paperwork.any(
Paperwork.description == "review #2")
eq_(sess.query(Person).filter(any_).all(), [m1])
+ def test_polymorphic_any_ten(self):
+ sess = create_session()
any_ = Company.employees.of_type(Engineer).any(
and_(Engineer.primary_language == 'cobol'))
eq_(sess.query(Company).filter(any_).one(), c2)
- def test_join_from_columns_or_subclass(self):
+ def test_join_from_columns_or_subclass_one(self):
sess = create_session()
expected = [
.order_by(Manager.name).all(),
expected)
+ def test_join_from_columns_or_subclass_two(self):
+ sess = create_session()
expected = [
(u'dogbert',),
(u'dogbert',),
.order_by(Manager.name).all(),
expected)
+ def test_join_from_columns_or_subclass_three(self):
+ sess = create_session()
expected = [
(u'dilbert',),
(u'dilbert',),
.order_by(Person.name).all(),
expected)
+ def test_join_from_columns_or_subclass_four(self):
+ sess = create_session()
# Load Person.name, joining from Person -> paperwork, get all
# the people.
expected = [
.order_by(Person.name).all(),
expected)
+ def test_join_from_columns_or_subclass_five(self):
+ sess = create_session()
# same, on manager. get only managers.
expected = [
(u'dogbert',),
.order_by(Person.name).all(),
expected)
+ def test_join_from_columns_or_subclass_six(self):
+ sess = create_session()
if select_type == '':
# this now raises, due to [ticket:1892]. Manager.person_id
# is now the "person_id" column on Manager. SQL is incorrect.
.order_by(Person.name).all(),
expected)
+ def test_join_from_columns_or_subclass_seven(self):
+ sess = create_session()
eq_(sess.query(Manager)
.join(Paperwork, Manager.paperwork)
.order_by(Manager.name).all(),
[m1, b1])
+ def test_join_from_columns_or_subclass_eight(self):
+ sess = create_session()
expected = [
(u'dogbert',),
(u'dogbert',),
.order_by(Manager.name).all(),
expected)
+ def test_join_from_columns_or_subclass_nine(self):
+ sess = create_session()
eq_(sess.query(Manager.person_id)
.join(paperwork,
Manager.person_id == paperwork.c.person_id)
.order_by(Manager.name).all(),
[(4,), (4,), (3,)])
+ def test_join_from_columns_or_subclass_ten(self):
+ sess = create_session()
expected = [
(u'pointy haired boss', u'review #1'),
(u'dogbert', u'review #2'),
.order_by(Paperwork.paperwork_id).all(),
expected)
+ def test_join_from_columns_or_subclass_eleven(self):
+ sess = create_session()
expected = [
(u'pointy haired boss',),
(u'dogbert',),
sess.expire(m2, ['manager_name', 'golf_swing'])
assert m2.golf_swing == 'fore'
- def test_with_polymorphic(self):
- sess = create_session()
-
- assert_raises(sa_exc.InvalidRequestError,
- sess.query(Person).with_polymorphic, Paperwork)
- assert_raises(sa_exc.InvalidRequestError,
- sess.query(Engineer).with_polymorphic, Boss)
- assert_raises(sa_exc.InvalidRequestError,
- sess.query(Engineer).with_polymorphic, Person)
-
- # compare to entities without related collections to prevent
- # additional lazy SQL from firing on loaded entities
- emps_without_relationships = [
+ def _emps_wo_relationships_fixture(self):
+ return [
Engineer(
name="dilbert",
engineer_name="dilbert",
primary_language="cobol",
status="elbonian engineer")
]
- eq_(sess.query(Person).with_polymorphic('*').all(),
- emps_without_relationships)
+ def test_with_polymorphic_one(self):
+ sess = create_session()
def go():
eq_(sess.query(Person)
.with_polymorphic(Engineer)
.filter(Engineer.primary_language == 'java').all(),
- emps_without_relationships[0:1])
+ self._emps_wo_relationships_fixture()[0:1])
self.assert_sql_count(testing.db, go, 1)
- sess.expunge_all()
+ def test_with_polymorphic_two(self):
+ sess = create_session()
def go():
eq_(sess.query(Person)
.with_polymorphic('*').all(),
- emps_without_relationships)
+ self._emps_wo_relationships_fixture())
self.assert_sql_count(testing.db, go, 1)
- sess.expunge_all()
+ def test_with_polymorphic_three(self):
+ sess = create_session()
def go():
eq_(sess.query(Person)
.with_polymorphic(Engineer).all(),
- emps_without_relationships)
+ self._emps_wo_relationships_fixture())
self.assert_sql_count(testing.db, go, 3)
- sess.expunge_all()
+ def test_with_polymorphic_four(self):
+ sess = create_session()
def go():
eq_(sess.query(Person)
.with_polymorphic(
Engineer,
people.outerjoin(engineers))
.all(),
- emps_without_relationships)
+ self._emps_wo_relationships_fixture())
self.assert_sql_count(testing.db, go, 3)
- sess.expunge_all()
+ def test_with_polymorphic_five(self):
+ sess = create_session()
def go():
# limit the polymorphic join down to just "Person",
# overriding select_table
eq_(sess.query(Person)
.with_polymorphic(Person).all(),
- emps_without_relationships)
+ self._emps_wo_relationships_fixture())
self.assert_sql_count(testing.db, go, 6)
+ def test_with_polymorphic_six(self):
+ sess = create_session()
+
+ assert_raises(sa_exc.InvalidRequestError,
+ sess.query(Person).with_polymorphic, Paperwork)
+ assert_raises(sa_exc.InvalidRequestError,
+ sess.query(Engineer).with_polymorphic, Boss)
+ assert_raises(sa_exc.InvalidRequestError,
+ sess.query(Engineer).with_polymorphic, Person)
+
+ def test_with_polymorphic_seven(self):
+ sess = create_session()
+ # compare to entities without related collections to prevent
+ # additional lazy SQL from firing on loaded entities
+ eq_(sess.query(Person).with_polymorphic('*').all(),
+ self._emps_wo_relationships_fixture())
+
+
def test_relationship_to_polymorphic(self):
expected = [
Company(
.filter(Engineer.primary_language == 'java').all(),
[c1])
- if select_type == '':
- eq_(sess.query(Company)
- .select_from(companies.join(people).join(engineers))
- .filter(Engineer.primary_language == 'java').all(),
- [c1])
+ def test_join_to_subclass_one(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .select_from(companies.join(people).join(engineers))
+ .filter(Engineer.primary_language == 'java').all(),
+ [c1])
- eq_(sess.query(Company)
- .join(people.join(engineers), 'employees')
- .filter(Engineer.primary_language == 'java').all(),
- [c1])
+ def test_join_to_subclass_two(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join(people.join(engineers), 'employees')
+ .filter(Engineer.primary_language == 'java').all(),
+ [c1])
- ealias = aliased(Engineer)
- eq_(sess.query(Company)
- .join(ealias, 'employees')
- .filter(ealias.primary_language == 'java').all(),
- [c1])
+ def test_join_to_subclass_three(self):
+ sess = create_session()
+ ealias = aliased(Engineer)
+ eq_(sess.query(Company)
+ .join(ealias, 'employees')
+ .filter(ealias.primary_language == 'java').all(),
+ [c1])
+ if select_type == '':
+ def test_join_to_subclass_four(self):
+ sess = create_session()
eq_(sess.query(Person)
.select_from(people.join(engineers))
.join(Engineer.machines).all(),
[e1, e2, e3])
+ def test_join_to_subclass_five(self):
+ sess = create_session()
eq_(sess.query(Person)
.select_from(people.join(engineers))
.join(Engineer.machines)
.filter(Machine.name.ilike("%ibm%")).all(),
[e1, e3])
- eq_(sess.query(Company)
- .join(people.join(engineers), 'employees')
- .join(Engineer.machines).all(),
- [c1, c2])
+ def test_join_to_subclass_six(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join(people.join(engineers), 'employees')
+ .join(Engineer.machines).all(),
+ [c1, c2])
- eq_(sess.query(Company)
- .join(people.join(engineers), 'employees')
- .join(Engineer.machines)
- .filter(Machine.name.ilike("%thinkpad%")).all(),
- [c1])
- else:
- eq_(sess.query(Company)
- .select_from(companies.join(people).join(engineers))
- .filter(Engineer.primary_language == 'java').all(),
- [c1])
+ def test_join_to_subclass_seven(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join(people.join(engineers), 'employees')
+ .join(Engineer.machines)
+ .filter(Machine.name.ilike("%thinkpad%")).all(),
+ [c1])
+
+
+ def test_join_to_subclass_eight(self):
+ sess = create_session()
+ eq_(sess.query(Person)
+ .join(Engineer.machines).all(),
+ [e1, e2, e3])
+ def test_join_to_subclass_nine(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .select_from(companies.join(people).join(engineers))
+ .filter(Engineer.primary_language == 'java').all(),
+ [c1])
+
+ if select_type != '':
+ def test_join_to_subclass_ten(self):
+ sess = create_session()
eq_(sess.query(Company)
.join('employees')
.filter(Engineer.primary_language == 'java').all(),
[c1])
- eq_(sess.query(Person)
- .join(Engineer.machines).all(),
- [e1, e2, e3])
+ def test_join_to_subclass_eleven(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .select_from(companies.join(people).join(engineers))
+ .filter(Engineer.primary_language == 'java').all(),
+ [c1])
- eq_(sess.query(Person)
- .join(Engineer.machines)
- .filter(Machine.name.ilike("%ibm%")).all(),
- [e1, e3])
+ def test_join_to_subclass_twelve(self):
+ sess = create_session()
+ eq_(sess.query(Person)
+ .join(Engineer.machines).all(),
+ [e1, e2, e3])
- eq_(sess.query(Company)
- .join('employees', Engineer.machines).all(),
- [c1, c2])
+ def test_join_to_subclass_thirteen(self):
+ sess = create_session()
+ eq_(sess.query(Person)
+ .join(Engineer.machines)
+ .filter(Machine.name.ilike("%ibm%")).all(),
+ [e1, e3])
- eq_(sess.query(Company)
- .join('employees', Engineer.machines)
- .filter(Machine.name.ilike("%thinkpad%")).all(),
- [c1])
+ def test_join_to_subclass_fourteen(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join('employees', Engineer.machines).all(),
+ [c1, c2])
+ def test_join_to_subclass_fifteen(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join('employees', Engineer.machines)
+ .filter(Machine.name.ilike("%thinkpad%")).all(),
+ [c1])
+
+ def test_join_to_subclass_sixteen(self):
+ sess = create_session()
# non-polymorphic
eq_(sess.query(Engineer)
.join(Engineer.machines).all(),
[e1, e2, e3])
+ def test_join_to_subclass_seventeen(self):
+ sess = create_session()
eq_(sess.query(Engineer)
.join(Engineer.machines)
.filter(Machine.name.ilike("%ibm%")).all(),
[e1, e3])
+ def test_join_to_subclass_eightteen(self):
+ sess = create_session()
# here's the new way
eq_(sess.query(Company)
.join(Company.employees.of_type(Engineer))
.filter(Engineer.primary_language == 'java').all(),
[c1])
+ def test_join_to_subclass_nineteen(self):
+ sess = create_session()
eq_(sess.query(Company)
.join(Company.employees.of_type(Engineer), 'machines')
.filter(Machine.name.ilike("%thinkpad%")).all(),
.filter(Engineer.primary_language == 'java').count(),
1)
- def test_join_through_polymorphic(self):
+ def test_join_through_polymorphic_nonaliased_one(self):
sess = create_session()
- for x in (True, False):
- eq_(sess.query(Company)
- .join('employees', 'paperwork', aliased=x)
- .filter(Paperwork.description.like('%#2%')).all(),
- [c1])
+ eq_(sess.query(Company)
+ .join('employees', 'paperwork', aliased=False)
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [c1])
- eq_(sess.query(Company)
- .join('employees', 'paperwork', aliased=x)
- .filter(Paperwork.description.like('%#%')).all(),
- [c1, c2])
+ def test_join_through_polymorphic_nonaliased_two(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join('employees', 'paperwork', aliased=False)
+ .filter(Paperwork.description.like('%#%')).all(),
+ [c1, c2])
- eq_(sess.query(Company)
- .join('employees', 'paperwork', aliased=x)
- .filter(Person.name.in_(['dilbert', 'vlad']))
- .filter(Paperwork.description.like('%#2%')).all(),
- [c1])
+ def test_join_through_polymorphic_nonaliased_three(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join('employees', 'paperwork', aliased=False)
+ .filter(Person.name.in_(['dilbert', 'vlad']))
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [c1])
- eq_(sess.query(Company)
- .join('employees', 'paperwork', aliased=x)
- .filter(Person.name.in_(['dilbert', 'vlad']))
- .filter(Paperwork.description.like('%#%')).all(),
- [c1, c2])
+ def test_join_through_polymorphic_nonaliased_four(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join('employees', 'paperwork', aliased=False)
+ .filter(Person.name.in_(['dilbert', 'vlad']))
+ .filter(Paperwork.description.like('%#%')).all(),
+ [c1, c2])
- eq_(sess.query(Company)
- .join('employees', aliased=aliased)
- .filter(Person.name.in_(['dilbert', 'vlad']))
- .join('paperwork', from_joinpoint=True, aliased=x)
- .filter(Paperwork.description.like('%#2%')).all(),
- [c1])
+ def test_join_through_polymorphic_nonaliased_five(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join('employees', aliased=aliased)
+ .filter(Person.name.in_(['dilbert', 'vlad']))
+ .join('paperwork', from_joinpoint=True, aliased=False)
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [c1])
- eq_(sess.query(Company)
- .join('employees', aliased=aliased)
- .filter(Person.name.in_(['dilbert', 'vlad']))
- .join('paperwork', from_joinpoint=True, aliased=x)
- .filter(Paperwork.description.like('%#%')).all(),
- [c1, c2])
+ def test_join_through_polymorphic_nonaliased_six(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join('employees', aliased=aliased)
+ .filter(Person.name.in_(['dilbert', 'vlad']))
+ .join('paperwork', from_joinpoint=True, aliased=False)
+ .filter(Paperwork.description.like('%#%')).all(),
+ [c1, c2])
+
+ def test_join_through_polymorphic_aliased_one(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join('employees', 'paperwork', aliased=True)
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [c1])
+
+ def test_join_through_polymorphic_aliased_two(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join('employees', 'paperwork', aliased=True)
+ .filter(Paperwork.description.like('%#%')).all(),
+ [c1, c2])
+
+ def test_join_through_polymorphic_aliased_three(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join('employees', 'paperwork', aliased=True)
+ .filter(Person.name.in_(['dilbert', 'vlad']))
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [c1])
+
+ def test_join_through_polymorphic_aliased_four(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join('employees', 'paperwork', aliased=True)
+ .filter(Person.name.in_(['dilbert', 'vlad']))
+ .filter(Paperwork.description.like('%#%')).all(),
+ [c1, c2])
+
+ def test_join_through_polymorphic_aliased_five(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join('employees', aliased=aliased)
+ .filter(Person.name.in_(['dilbert', 'vlad']))
+ .join('paperwork', from_joinpoint=True, aliased=True)
+ .filter(Paperwork.description.like('%#2%')).all(),
+ [c1])
+
+ def test_join_through_polymorphic_aliased_six(self):
+ sess = create_session()
+ eq_(sess.query(Company)
+ .join('employees', aliased=aliased)
+ .filter(Person.name.in_(['dilbert', 'vlad']))
+ .join('paperwork', from_joinpoint=True, aliased=True)
+ .filter(Paperwork.description.like('%#%')).all(),
+ [c1, c2])
def test_explicit_polymorphic_join(self):
sess = create_session()
.filter(Person.person_id.in_(subq)).one(),
e1)
- def test_mixed_entities(self):
- sess = create_session()
-
- expected = [(
- u'Elbonia, Inc.',
- Engineer(
- status=u'elbonian engineer',
- engineer_name=u'vlad',
- name=u'vlad',
- primary_language=u'cobol'))]
- eq_(sess.query(Company.name, Person)
- .join(Company.employees)
- .filter(Company.name == 'Elbonia, Inc.').all(),
- expected)
-
- expected = [(
- Engineer(
- status=u'elbonian engineer',
- engineer_name=u'vlad',
- name=u'vlad',
- primary_language=u'cobol'),
- u'Elbonia, Inc.')]
- eq_(sess.query(Person, Company.name)
- .join(Company.employees)
- .filter(Company.name == 'Elbonia, Inc.').all(),
- expected)
-
- expected = [('pointy haired boss',), ('dogbert',)]
- eq_(sess.query(Manager.name).all(), expected)
-
- expected = [('pointy haired boss foo',), ('dogbert foo',)]
- eq_(sess.query(Manager.name + " foo").all(), expected)
-
- row = sess.query(Engineer.name, Engineer.primary_language) \
- .filter(Engineer.name == 'dilbert').first()
- assert row.name == 'dilbert'
- assert row.primary_language == 'java'
-
- expected = [
- (u'dilbert', u'java'),
- (u'wally', u'c++'),
- (u'vlad', u'cobol')]
- eq_(sess.query(Engineer.name, Engineer.primary_language).all(),
- expected)
-
- expected = [(u'pointy haired boss', u'fore')]
- eq_(sess.query(Boss.name, Boss.golf_swing).all(), expected)
-
- # TODO: I think raise error on these for now. different
- # inheritance/loading schemes have different results here,
- # all incorrect
- #
- # eq_(
- # sess.query(Person.name, Engineer.primary_language).all(),
- # [])
- # eq_(sess.query(
- # Person.name,
- # Engineer.primary_language,
- # Manager.manager_name)
- # .all(),
- # [])
- expected = [(u'vlad', u'Elbonia, Inc.')]
- eq_(sess.query(Person.name, Company.name)
- .join(Company.employees)
- .filter(Company.name == 'Elbonia, Inc.').all(),
- expected)
+ if select_type != '':
+ def test_mixed_entities_one(self):
+ sess = create_session()
- expected = [(u'java',), (u'c++',), (u'cobol',)]
- eq_(sess.query(Engineer.primary_language)
- .filter(Person.type == 'engineer').all(),
- expected)
-
- if select_type != '':
expected = [
(Engineer(
status=u'regular engineer',
.filter(Person.type == 'engineer').all(),
expected)
+ if select_type != '':
+ def test_mixed_entities_two(self):
+ sess = create_session()
expected = [
(u'java', u'MegaCorp, Inc.'),
(u'cobol', u'Elbonia, Inc.'),
.order_by(desc(Engineer.primary_language)).all(),
expected)
+ def test_mixed_entities_three(self):
+ sess = create_session()
palias = aliased(Person)
expected = [(
Engineer(
.filter(palias.name == 'dilbert').all(),
expected)
+ def test_mixed_entities_four(self):
+ sess = create_session()
+ palias = aliased(Person)
expected = [(
Engineer(
status=u'regular engineer',
.filter(palias.name == 'dilbert').all(),
expected)
+ def test_mixed_entities_five(self):
+ sess = create_session()
+ palias = aliased(Person)
expected = [(u'vlad', u'Elbonia, Inc.', u'dilbert')]
eq_(sess.query(Person.name, Company.name, palias.name)
.join(Company.employees)
.filter(palias.name == 'dilbert').all(),
expected)
+ def test_mixed_entities_six(self):
+ sess = create_session()
palias = aliased(Person)
expected = [
(u'manager', u'dogbert', u'engineer', u'dilbert'),
.order_by(Person.person_id, palias.person_id).all(),
expected)
+ def test_mixed_entities_seven(self):
+ sess = create_session()
expected = [
(u'dilbert', u'tps report #1'),
(u'dilbert', u'tps report #2'),
.order_by(Person.name, Paperwork.description).all(),
expected)
- if select_type != '':
+ if select_type != '':
+ def test_mixed_entities_eight(self):
+ sess = create_session()
eq_(sess.query(func.count(Person.person_id))
.filter(Engineer.primary_language == 'java').all(),
[(1,)])
+ def test_mixed_entities_nine(self):
+ sess = create_session()
expected = [(u'Elbonia, Inc.', 1), (u'MegaCorp, Inc.', 4)]
eq_(sess.query(Company.name, func.count(Person.person_id))
.filter(Company.company_id == Person.company_id)
.order_by(Company.name).all(),
expected)
+ def test_mixed_entities_ten(self):
+ sess = create_session()
expected = [(u'Elbonia, Inc.', 1), (u'MegaCorp, Inc.', 4)]
eq_(sess.query(Company.name, func.count(Person.person_id))
.join(Company.employees)
.order_by(Company.name).all(),
expected)
+ #def test_mixed_entities(self):
+ # sess = create_session()
+ # TODO: I think raise error on these for now. different
+ # inheritance/loading schemes have different results here,
+ # all incorrect
+ #
+ # eq_(
+ # sess.query(Person.name, Engineer.primary_language).all(),
+ # [])
+
+ #def test_mixed_entities(self):
+ # sess = create_session()
+ # eq_(sess.query(
+ # Person.name,
+ # Engineer.primary_language,
+ # Manager.manager_name)
+ # .all(),
+ # [])
+
+ def test_mixed_entities_eleven(self):
+ sess = create_session()
+ expected = [(u'java',), (u'c++',), (u'cobol',)]
+ eq_(sess.query(Engineer.primary_language)
+ .filter(Person.type == 'engineer').all(),
+ expected)
+
+ def test_mixed_entities_twelve(self):
+ sess = create_session()
+ expected = [(u'vlad', u'Elbonia, Inc.')]
+ eq_(sess.query(Person.name, Company.name)
+ .join(Company.employees)
+ .filter(Company.name == 'Elbonia, Inc.').all(),
+ expected)
+
+ def test_mixed_entities_thirteen(self):
+ sess = create_session()
+ expected = [(u'pointy haired boss', u'fore')]
+ eq_(sess.query(Boss.name, Boss.golf_swing).all(), expected)
+
+ def test_mixed_entities_fourteen(self):
+ sess = create_session()
+ expected = [
+ (u'dilbert', u'java'),
+ (u'wally', u'c++'),
+ (u'vlad', u'cobol')]
+ eq_(sess.query(Engineer.name, Engineer.primary_language).all(),
+ expected)
+
+ def test_mixed_entities_fifteen(self):
+ sess = create_session()
+
+ expected = [(
+ u'Elbonia, Inc.',
+ Engineer(
+ status=u'elbonian engineer',
+ engineer_name=u'vlad',
+ name=u'vlad',
+ primary_language=u'cobol'))]
+ eq_(sess.query(Company.name, Person)
+ .join(Company.employees)
+ .filter(Company.name == 'Elbonia, Inc.').all(),
+ expected)
+
+ def test_mixed_entities_sixteen(self):
+ sess = create_session()
+ expected = [(
+ Engineer(
+ status=u'elbonian engineer',
+ engineer_name=u'vlad',
+ name=u'vlad',
+ primary_language=u'cobol'),
+ u'Elbonia, Inc.')]
+ eq_(sess.query(Person, Company.name)
+ .join(Company.employees)
+ .filter(Company.name == 'Elbonia, Inc.').all(),
+ expected)
+
+ def test_mixed_entities_seventeen(self):
+ sess = create_session()
+ expected = [('pointy haired boss',), ('dogbert',)]
+ eq_(sess.query(Manager.name).all(), expected)
+
+ def test_mixed_entities_eighteen(self):
+ sess = create_session()
+ expected = [('pointy haired boss foo',), ('dogbert foo',)]
+ eq_(sess.query(Manager.name + " foo").all(), expected)
+
+ def test_mixed_entities_nineteen(self):
+ sess = create_session()
+ row = sess.query(Engineer.name, Engineer.primary_language) \
+ .filter(Engineer.name == 'dilbert').first()
+ assert row.name == 'dilbert'
+ assert row.primary_language == 'java'
+
PolymorphicQueryTest.__name__ = "Polymorphic%sTest" % select_type
return PolymorphicQueryTest
exec("%s = testclass" % testclass.__name__)
del testclass
-
-class SelfReferentialTestJoinedToBase(fixtures.MappedTest):
-
- run_setup_mappers = 'once'
-
- @classmethod
- def define_tables(cls, metadata):
- Table('people', metadata,
- Column('person_id', Integer,
- primary_key=True,
- test_needs_autoincrement=True),
- Column('name', String(50)),
- Column('type', String(30)))
-
- Table('engineers', metadata,
- Column('person_id', Integer,
- ForeignKey('people.person_id'),
- primary_key=True),
- Column('primary_language', String(50)),
- Column('reports_to_id', Integer,
- ForeignKey('people.person_id')))
-
- @classmethod
- def setup_mappers(cls):
- engineers, people = cls.tables.engineers, cls.tables.people
-
- mapper(Person, people,
- polymorphic_on=people.c.type,
- polymorphic_identity='person')
-
- mapper(Engineer, engineers,
- inherits=Person,
- inherit_condition=engineers.c.person_id == people.c.person_id,
- polymorphic_identity='engineer',
- properties={
- 'reports_to':relationship(
- Person,
- primaryjoin=
- people.c.person_id == engineers.c.reports_to_id)})
-
- def test_has(self):
- p1 = Person(name='dogbert')
- e1 = Engineer(name='dilbert', primary_language='java', reports_to=p1)
- sess = create_session()
- sess.add(p1)
- sess.add(e1)
- sess.flush()
- sess.expunge_all()
- eq_(sess.query(Engineer)
- .filter(Engineer.reports_to.has(Person.name == 'dogbert'))
- .first(),
- Engineer(name='dilbert'))
-
- def test_oftype_aliases_in_exists(self):
- e1 = Engineer(name='dilbert', primary_language='java')
- e2 = Engineer(name='wally', primary_language='c++', reports_to=e1)
- sess = create_session()
- sess.add_all([e1, e2])
- sess.flush()
- eq_(sess.query(Engineer)
- .filter(Engineer.reports_to
- .of_type(Engineer)
- .has(Engineer.name == 'dilbert'))
- .first(),
- e2)
-
- def test_join(self):
- p1 = Person(name='dogbert')
- e1 = Engineer(name='dilbert', primary_language='java', reports_to=p1)
- sess = create_session()
- sess.add(p1)
- sess.add(e1)
- sess.flush()
- sess.expunge_all()
- eq_(sess.query(Engineer)
- .join('reports_to', aliased=True)
- .filter(Person.name == 'dogbert').first(),
- Engineer(name='dilbert'))
-
-class SelfReferentialJ2JTest(fixtures.MappedTest):
-
- run_setup_mappers = 'once'
-
- @classmethod
- def define_tables(cls, metadata):
- people = Table('people', metadata,
- Column('person_id', Integer,
- primary_key=True,
- test_needs_autoincrement=True),
- Column('name', String(50)),
- Column('type', String(30)))
-
- engineers = Table('engineers', metadata,
- Column('person_id', Integer,
- ForeignKey('people.person_id'),
- primary_key=True),
- Column('primary_language', String(50)),
- Column('reports_to_id', Integer,
- ForeignKey('managers.person_id'))
- )
-
- managers = Table('managers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'),
- primary_key=True),
- )
-
- @classmethod
- def setup_mappers(cls):
- engineers = cls.tables.engineers
- managers = cls.tables.managers
- people = cls.tables.people
-
- mapper(Person, people,
- polymorphic_on=people.c.type,
- polymorphic_identity='person')
-
- mapper(Manager, managers,
- inherits=Person,
- polymorphic_identity='manager')
-
- mapper(Engineer, engineers,
- inherits=Person,
- polymorphic_identity='engineer',
- properties={
- 'reports_to':relationship(
- Manager,
- primaryjoin=
- managers.c.person_id == engineers.c.reports_to_id,
- backref='engineers')})
-
- def test_has(self):
- m1 = Manager(name='dogbert')
- e1 = Engineer(name='dilbert', primary_language='java', reports_to=m1)
- sess = create_session()
- sess.add(m1)
- sess.add(e1)
- sess.flush()
- sess.expunge_all()
-
- eq_(sess.query(Engineer)
- .filter(Engineer.reports_to.has(Manager.name == 'dogbert'))
- .first(),
- Engineer(name='dilbert'))
-
- def test_join(self):
- m1 = Manager(name='dogbert')
- e1 = Engineer(name='dilbert', primary_language='java', reports_to=m1)
- sess = create_session()
- sess.add(m1)
- sess.add(e1)
- sess.flush()
- sess.expunge_all()
-
- eq_(sess.query(Engineer)
- .join('reports_to', aliased=True)
- .filter(Manager.name == 'dogbert').first(),
- Engineer(name='dilbert'))
-
- def test_filter_aliasing(self):
- m1 = Manager(name='dogbert')
- m2 = Manager(name='foo')
- e1 = Engineer(name='wally', primary_language='java', reports_to=m1)
- e2 = Engineer(name='dilbert', primary_language='c++', reports_to=m2)
- e3 = Engineer(name='etc', primary_language='c++')
-
- sess = create_session()
- sess.add_all([m1, m2, e1, e2, e3])
- sess.flush()
- sess.expunge_all()
-
- # filter aliasing applied to Engineer doesn't whack Manager
- eq_(sess.query(Manager)
- .join(Manager.engineers)
- .filter(Manager.name == 'dogbert').all(),
- [m1])
-
- eq_(sess.query(Manager)
- .join(Manager.engineers)
- .filter(Engineer.name == 'dilbert').all(),
- [m2])
-
- eq_(sess.query(Manager, Engineer)
- .join(Manager.engineers)
- .order_by(Manager.name.desc()).all(),
- [(m2, e2), (m1, e1)])
-
- def test_relationship_compare(self):
- m1 = Manager(name='dogbert')
- m2 = Manager(name='foo')
- e1 = Engineer(name='dilbert', primary_language='java', reports_to=m1)
- e2 = Engineer(name='wally', primary_language='c++', reports_to=m2)
- e3 = Engineer(name='etc', primary_language='c++')
-
- sess = create_session()
- sess.add(m1)
- sess.add(m2)
- sess.add(e1)
- sess.add(e2)
- sess.add(e3)
- sess.flush()
- sess.expunge_all()
-
- eq_(sess.query(Manager)
- .join(Manager.engineers)
- .filter(Engineer.reports_to == None).all(),
- [])
-
- eq_(sess.query(Manager)
- .join(Manager.engineers)
- .filter(Engineer.reports_to == m1).all(),
- [m1])
-
-class SelfReferentialJ2JSelfTest(fixtures.MappedTest):
-
- run_setup_mappers = 'once'
-
- @classmethod
- def define_tables(cls, metadata):
- people = Table('people', metadata,
- Column('person_id', Integer,
- primary_key=True,
- test_needs_autoincrement=True),
- Column('name', String(50)),
- Column('type', String(30)))
-
- engineers = Table('engineers', metadata,
- Column('person_id', Integer,
- ForeignKey('people.person_id'),
- primary_key=True),
- Column('reports_to_id', Integer,
- ForeignKey('engineers.person_id')))
-
- @classmethod
- def setup_mappers(cls):
- engineers = cls.tables.engineers
- people = cls.tables.people
-
- mapper(Person, people,
- polymorphic_on=people.c.type,
- polymorphic_identity='person')
-
- mapper(Engineer, engineers,
- inherits=Person,
- polymorphic_identity='engineer',
- properties={
- 'reports_to':relationship(
- Engineer,
- primaryjoin=
- engineers.c.person_id == engineers.c.reports_to_id,
- backref='engineers',
- remote_side=engineers.c.person_id)})
-
- def _two_obj_fixture(self):
- e1 = Engineer(name='wally')
- e2 = Engineer(name='dilbert', reports_to=e1)
- sess = Session()
- sess.add_all([e1, e2])
- sess.commit()
- return sess
-
- def _five_obj_fixture(self):
- sess = Session()
- e1, e2, e3, e4, e5 = [
- Engineer(name='e%d' % (i + 1)) for i in xrange(5)
- ]
- e3.reports_to = e1
- e4.reports_to = e2
- sess.add_all([e1, e2, e3, e4, e5])
- sess.commit()
- return sess
-
- def test_has(self):
- sess = self._two_obj_fixture()
- eq_(sess.query(Engineer)
- .filter(Engineer.reports_to.has(Engineer.name == 'wally'))
- .first(),
- Engineer(name='dilbert'))
-
- def test_join_explicit_alias(self):
- sess = self._five_obj_fixture()
- ea = aliased(Engineer)
- eq_(sess.query(Engineer)
- .join(ea, Engineer.engineers)
- .filter(Engineer.name == 'e1').all(),
- [Engineer(name='e1')])
-
- def test_join_aliased_flag_one(self):
- sess = self._two_obj_fixture()
- eq_(sess.query(Engineer)
- .join('reports_to', aliased=True)
- .filter(Engineer.name == 'wally').first(),
- Engineer(name='dilbert'))
-
- def test_join_aliased_flag_two(self):
- sess = self._five_obj_fixture()
- eq_(sess.query(Engineer)
- .join(Engineer.engineers, aliased=True)
- .filter(Engineer.name == 'e4').all(),
- [Engineer(name='e2')])
-
- def test_relationship_compare(self):
- sess = self._five_obj_fixture()
- e1 = sess.query(Engineer).filter_by(name='e1').one()
-
- eq_(sess.query(Engineer)
- .join(Engineer.engineers, aliased=True)
- .filter(Engineer.reports_to == None).all(),
- [])
-
- eq_(sess.query(Engineer)
- .join(Engineer.engineers, aliased=True)
- .filter(Engineer.reports_to == e1).all(),
- [e1])
-
-class M2MFilterTest(fixtures.MappedTest):
-
- run_setup_mappers = 'once'
- run_inserts = 'once'
- run_deletes = None
-
- @classmethod
- def define_tables(cls, metadata):
- organizations = Table('organizations', metadata,
- Column('id', Integer,
- primary_key=True,
- test_needs_autoincrement=True),
- Column('name', String(50)))
-
- engineers_to_org = Table('engineers_to_org', metadata,
- Column('org_id', Integer,
- ForeignKey('organizations.id')),
- Column('engineer_id', Integer,
- ForeignKey('engineers.person_id')))
-
- people = Table('people', metadata,
- Column('person_id', Integer,
- primary_key=True,
- test_needs_autoincrement=True),
- Column('name', String(50)),
- Column('type', String(30)))
-
- engineers = Table('engineers', metadata,
- Column('person_id', Integer,
- ForeignKey('people.person_id'),
- primary_key=True),
- Column('primary_language', String(50)))
-
- @classmethod
- def setup_mappers(cls):
- organizations = cls.tables.organizations
- people = cls.tables.people
- engineers = cls.tables.engineers
- engineers_to_org = cls.tables.engineers_to_org
-
- class Organization(cls.Comparable):
- pass
-
- mapper(Organization, organizations,
- properties={
- 'engineers':relationship(
- Engineer,
- secondary=engineers_to_org,
- backref='organizations')})
-
- mapper(Person, people,
- polymorphic_on=people.c.type,
- polymorphic_identity='person')
-
- mapper(Engineer, engineers,
- inherits=Person,
- polymorphic_identity='engineer')
-
- @classmethod
- def insert_data(cls):
- Organization = cls.classes.Organization
- e1 = Engineer(name='e1')
- e2 = Engineer(name='e2')
- e3 = Engineer(name='e3')
- e4 = Engineer(name='e4')
- org1 = Organization(name='org1', engineers=[e1, e2])
- org2 = Organization(name='org2', engineers=[e3, e4])
- sess = create_session()
- sess.add(org1)
- sess.add(org2)
- sess.flush()
-
- def test_not_contains(self):
- Organization = self.classes.Organization
- sess = create_session()
- e1 = sess.query(Person).filter(Engineer.name == 'e1').one()
-
- # this works
- eq_(sess.query(Organization)
- .filter(~Organization.engineers
- .of_type(Engineer)
- .contains(e1))
- .all(),
- [Organization(name='org2')])
-
- # this had a bug
- eq_(sess.query(Organization)
- .filter(~Organization.engineers
- .contains(e1))
- .all(),
- [Organization(name='org2')])
-
- def test_any(self):
- sess = create_session()
- Organization = self.classes.Organization
-
- eq_(sess.query(Organization)
- .filter(Organization.engineers
- .of_type(Engineer)
- .any(Engineer.name == 'e1'))
- .all(),
- [Organization(name='org1')])
-
- eq_(sess.query(Organization)
- .filter(Organization.engineers
- .any(Engineer.name == 'e1'))
- .all(),
- [Organization(name='org1')])
-
-class SelfReferentialM2MTest(fixtures.MappedTest, AssertsCompiledSQL):
-
- @classmethod
- def define_tables(cls, metadata):
- Table('secondary', metadata,
- Column('left_id', Integer,
- ForeignKey('parent.id'),
- nullable=False),
- Column('right_id', Integer,
- ForeignKey('parent.id'),
- nullable=False))
-
- Table('parent', metadata,
- Column('id', Integer,
- primary_key=True,
- test_needs_autoincrement=True),
- Column('cls', String(50)))
-
- Table('child1', metadata,
- Column('id', Integer,
- ForeignKey('parent.id'),
- primary_key=True))
-
- Table('child2', metadata,
- Column('id', Integer,
- ForeignKey('parent.id'),
- primary_key=True))
-
- @classmethod
- def setup_classes(cls):
- class Parent(cls.Basic):
- pass
- class Child1(Parent):
- pass
- class Child2(Parent):
- pass
-
- @classmethod
- def setup_mappers(cls):
- child1 = cls.tables.child1
- child2 = cls.tables.child2
- Parent = cls.classes.Parent
- parent = cls.tables.parent
- Child1 = cls.classes.Child1
- Child2 = cls.classes.Child2
- secondary = cls.tables.secondary
-
- mapper(Parent, parent,
- polymorphic_on=parent.c.cls)
-
- mapper(Child1, child1,
- inherits=Parent,
- polymorphic_identity='child1',
- properties={
- 'left_child2':relationship(
- Child2,
- secondary=secondary,
- primaryjoin=parent.c.id == secondary.c.right_id,
- secondaryjoin=parent.c.id == secondary.c.left_id,
- uselist=False,
- backref="right_children")})
-
- mapper(Child2, child2,
- inherits=Parent,
- polymorphic_identity='child2')
-
- def test_query_crit(self):
- Child1, Child2 = self.classes.Child1, self.classes.Child2
- sess = create_session()
- c11, c12, c13 = Child1(), Child1(), Child1()
- c21, c22, c23 = Child2(), Child2(), Child2()
- c11.left_child2 = c22
- c12.left_child2 = c22
- c13.left_child2 = c23
- sess.add_all([c11, c12, c13, c21, c22, c23])
- sess.flush()
-
- # test that the join to Child2 doesn't alias Child1 in the select
- eq_(set(sess.query(Child1).join(Child1.left_child2)),
- set([c11, c12, c13]))
-
- eq_(set(sess.query(Child1, Child2).join(Child1.left_child2)),
- set([(c11, c22), (c12, c22), (c13, c23)]))
-
- # test __eq__() on property is annotating correctly
- eq_(set(sess.query(Child2)
- .join(Child2.right_children)
- .filter(Child1.left_child2 == c22)),
- set([c22]))
-
- # test the same again
- self.assert_compile(
- sess.query(Child2)
- .join(Child2.right_children)
- .filter(Child1.left_child2 == c22)
- .with_labels().statement,
- "SELECT child2.id AS child2_id, parent.id AS parent_id, "
- "parent.cls AS parent_cls FROM secondary AS secondary_1, "
- "parent JOIN child2 ON parent.id = child2.id JOIN secondary AS "
- "secondary_2 ON parent.id = secondary_2.left_id JOIN (SELECT "
- "parent.id AS parent_id, parent.cls AS parent_cls, child1.id AS "
- "child1_id FROM parent JOIN child1 ON parent.id = child1.id) AS "
- "anon_1 ON anon_1.parent_id = secondary_2.right_id WHERE "
- "anon_1.parent_id = secondary_1.right_id AND :param_1 = "
- "secondary_1.left_id",
- dialect=default.DefaultDialect()
- )
-
- def test_eager_join(self):
- Child1, Child2 = self.classes.Child1, self.classes.Child2
- sess = create_session()
- c1 = Child1()
- c1.left_child2 = Child2()
- sess.add(c1)
- sess.flush()
-
- # test that the splicing of the join works here, doesn't break in
- # the middle of "parent join child1"
- q = sess.query(Child1).options(joinedload('left_child2'))
- self.assert_compile(q.limit(1).with_labels().statement,
- "SELECT anon_1.child1_id AS anon_1_child1_id, anon_1.parent_id "
- "AS anon_1_parent_id, anon_1.parent_cls AS anon_1_parent_cls, "
- "anon_2.child2_id AS anon_2_child2_id, anon_2.parent_id AS "
- "anon_2_parent_id, anon_2.parent_cls AS anon_2_parent_cls FROM "
- "(SELECT child1.id AS child1_id, parent.id AS parent_id, "
- "parent.cls AS parent_cls FROM parent JOIN child1 ON parent.id = "
- "child1.id LIMIT :param_1) AS anon_1 LEFT OUTER JOIN secondary "
- "AS secondary_1 ON anon_1.parent_id = secondary_1.right_id LEFT "
- "OUTER JOIN (SELECT parent.id AS parent_id, parent.cls AS "
- "parent_cls, child2.id AS child2_id FROM parent JOIN child2 ON "
- "parent.id = child2.id) AS anon_2 ON anon_2.parent_id = "
- "secondary_1.left_id",
- {'param_1':1},
- dialect=default.DefaultDialect())
-
- # another way to check
- assert q.limit(1).with_labels().subquery().count().scalar() == 1
- assert q.first() is c1
-
- def test_subquery_load(self):
- Child1, Child2 = self.classes.Child1, self.classes.Child2
- sess = create_session()
- c1 = Child1()
- c1.left_child2 = Child2()
- sess.add(c1)
- sess.flush()
- sess.expunge_all()
-
- query_ = sess.query(Child1).options(subqueryload('left_child2'))
- for row in query_.all():
- assert row.left_child2
-
-class EagerToSubclassTest(fixtures.MappedTest):
- """Test eager loads to subclass mappers"""
-
- run_setup_classes = 'once'
- run_setup_mappers = 'once'
- run_inserts = 'once'
- run_deletes = None
-
- @classmethod
- def define_tables(cls, metadata):
- Table('parent', metadata,
- Column('id', Integer,
- primary_key=True,
- test_needs_autoincrement=True),
- Column('data', String(10)))
-
- Table('base', metadata,
- Column('id', Integer,
- primary_key=True,
- test_needs_autoincrement=True),
- Column('type', String(10)),
- Column('related_id', Integer,
- ForeignKey('related.id')))
-
- Table('sub', metadata,
- Column('id', Integer,
- ForeignKey('base.id'),
- primary_key=True),
- Column('data', String(10)),
- Column('parent_id', Integer,
- ForeignKey('parent.id'),
- nullable=False))
-
- Table('related', metadata,
- Column('id', Integer,
- primary_key=True,
- test_needs_autoincrement=True),
- Column('data', String(10)))
-
- @classmethod
- def setup_classes(cls):
- class Parent(cls.Comparable):
- pass
- class Base(cls.Comparable):
- pass
- class Sub(Base):
- pass
- class Related(cls.Comparable):
- pass
-
- @classmethod
- def setup_mappers(cls):
- sub = cls.tables.sub
- Sub = cls.classes.Sub
- base = cls.tables.base
- Base = cls.classes.Base
- parent = cls.tables.parent
- Parent = cls.classes.Parent
- related = cls.tables.related
- Related = cls.classes.Related
-
- mapper(Parent, parent,
- properties={'children':relationship(Sub, order_by=sub.c.data)})
-
- mapper(Base, base,
- polymorphic_on=base.c.type,
- polymorphic_identity='b',
- properties={'related':relationship(Related)})
-
- mapper(Sub, sub,
- inherits=Base,
- polymorphic_identity='s')
-
- mapper(Related, related)
-
- @classmethod
- def insert_data(cls):
- global p1, p2
-
- Parent = cls.classes.Parent
- Sub = cls.classes.Sub
- Related = cls.classes.Related
- sess = Session()
- r1, r2 = Related(data='r1'), Related(data='r2')
- s1 = Sub(data='s1', related=r1)
- s2 = Sub(data='s2', related=r2)
- s3 = Sub(data='s3')
- s4 = Sub(data='s4', related=r2)
- s5 = Sub(data='s5')
- p1 = Parent(data='p1', children=[s1, s2, s3])
- p2 = Parent(data='p2', children=[s4, s5])
- sess.add(p1)
- sess.add(p2)
- sess.commit()
-
- def test_joinedload(self):
- Parent = self.classes.Parent
- sess = Session()
- def go():
- eq_(sess.query(Parent)
- .options(joinedload(Parent.children)).all(),
- [p1, p2])
- self.assert_sql_count(testing.db, go, 1)
-
- def test_contains_eager(self):
- Parent = self.classes.Parent
- Sub = self.classes.Sub
- sess = Session()
- def go():
- eq_(sess.query(Parent)
- .join(Parent.children)
- .options(contains_eager(Parent.children))
- .order_by(Parent.data, Sub.data).all(),
- [p1, p2])
- self.assert_sql_count(testing.db, go, 1)
-
- def test_subq_through_related(self):
- Parent = self.classes.Parent
- Sub = self.classes.Sub
- sess = Session()
- def go():
- eq_(sess.query(Parent)
- .options(subqueryload_all(Parent.children, Sub.related))
- .order_by(Parent.data).all(),
- [p1, p2])
- self.assert_sql_count(testing.db, go, 3)
-
-class SubClassEagerToSubClassTest(fixtures.MappedTest):
- """Test joinedloads from subclass to subclass mappers"""
-
- run_setup_classes = 'once'
- run_setup_mappers = 'once'
- run_inserts = 'once'
- run_deletes = None
-
- @classmethod
- def define_tables(cls, metadata):
- Table('parent', metadata,
- Column('id', Integer,
- primary_key=True,
- test_needs_autoincrement=True),
- Column('type', String(10)),
- )
-
- Table('subparent', metadata,
- Column('id', Integer,
- ForeignKey('parent.id'),
- primary_key=True),
- Column('data', String(10)),
- )
-
- Table('base', metadata,
- Column('id', Integer,
- primary_key=True,
- test_needs_autoincrement=True),
- Column('type', String(10)),
- )
-
- Table('sub', metadata,
- Column('id', Integer,
- ForeignKey('base.id'),
- primary_key=True),
- Column('data', String(10)),
- Column('subparent_id', Integer,
- ForeignKey('subparent.id'),
- nullable=False)
- )
-
- @classmethod
- def setup_classes(cls):
- class Parent(cls.Comparable):
- pass
- class Subparent(Parent):
- pass
- class Base(cls.Comparable):
- pass
- class Sub(Base):
- pass
-
- @classmethod
- def setup_mappers(cls):
- sub = cls.tables.sub
- Sub = cls.classes.Sub
- base = cls.tables.base
- Base = cls.classes.Base
- parent = cls.tables.parent
- Parent = cls.classes.Parent
- subparent = cls.tables.subparent
- Subparent = cls.classes.Subparent
-
- mapper(Parent, parent,
- polymorphic_on=parent.c.type,
- polymorphic_identity='b')
-
- mapper(Subparent, subparent,
- inherits=Parent,
- polymorphic_identity='s',
- properties={
- 'children':relationship(Sub, order_by=base.c.id)})
-
- mapper(Base, base,
- polymorphic_on=base.c.type,
- polymorphic_identity='b')
-
- mapper(Sub, sub,
- inherits=Base,
- polymorphic_identity='s')
-
- @classmethod
- def insert_data(cls):
- global p1, p2
-
- Sub, Subparent = cls.classes.Sub, cls.classes.Subparent
- sess = create_session()
- p1 = Subparent(
- data='p1',
- children=[Sub(data='s1'), Sub(data='s2'), Sub(data='s3')])
- p2 = Subparent(
- data='p2',
- children=[Sub(data='s4'), Sub(data='s5')])
- sess.add(p1)
- sess.add(p2)
- sess.flush()
-
- def test_joinedload(self):
- Subparent = self.classes.Subparent
-
- sess = create_session()
- def go():
- eq_(sess.query(Subparent)
- .options(joinedload(Subparent.children)).all(),
- [p1, p2])
- self.assert_sql_count(testing.db, go, 1)
-
- sess.expunge_all()
- def go():
- eq_(sess.query(Subparent)
- .options(joinedload("children")).all(),
- [p1, p2])
- self.assert_sql_count(testing.db, go, 1)
-
- def test_contains_eager(self):
- Subparent = self.classes.Subparent
-
- sess = create_session()
- def go():
- eq_(sess.query(Subparent)
- .join(Subparent.children)
- .options(contains_eager(Subparent.children)).all(),
- [p1, p2])
- self.assert_sql_count(testing.db, go, 1)
-
- sess.expunge_all()
- def go():
- eq_(sess.query(Subparent)
- .join(Subparent.children)
- .options(contains_eager("children")).all(),
- [p1, p2])
- self.assert_sql_count(testing.db, go, 1)
-
- def test_subqueryload(self):
- Subparent = self.classes.Subparent
-
- sess = create_session()
- def go():
- eq_(sess.query(Subparent)
- .options(subqueryload(Subparent.children)).all(),
- [p1, p2])
- self.assert_sql_count(testing.db, go, 2)
-
- sess.expunge_all()
- def go():
- eq_(sess.query(Subparent)
- .options(subqueryload("children")).all(),
- [p1, p2])
- self.assert_sql_count(testing.db, go, 2)
-
--- /dev/null
+from sqlalchemy.orm import create_session, relationship, mapper, \
+ contains_eager, joinedload, subqueryload, subqueryload_all,\
+ Session, aliased
+
+from sqlalchemy import Integer, String, ForeignKey
+from sqlalchemy.engine import default
+
+from test.lib import AssertsCompiledSQL, fixtures, testing
+from test.lib.schema import Table, Column
+from test.lib.testing import assert_raises, eq_
+
+class Company(fixtures.ComparableEntity):
+ pass
+class Person(fixtures.ComparableEntity):
+ pass
+class Engineer(Person):
+ pass
+class Manager(Person):
+ pass
+class Boss(Manager):
+ pass
+class Machine(fixtures.ComparableEntity):
+ pass
+class Paperwork(fixtures.ComparableEntity):
+ pass
+
+class SelfReferentialTestJoinedToBase(fixtures.MappedTest):
+
+ run_setup_mappers = 'once'
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('people', metadata,
+ Column('person_id', Integer,
+ primary_key=True,
+ test_needs_autoincrement=True),
+ Column('name', String(50)),
+ Column('type', String(30)))
+
+ Table('engineers', metadata,
+ Column('person_id', Integer,
+ ForeignKey('people.person_id'),
+ primary_key=True),
+ Column('primary_language', String(50)),
+ Column('reports_to_id', Integer,
+ ForeignKey('people.person_id')))
+
+ @classmethod
+ def setup_mappers(cls):
+ engineers, people = cls.tables.engineers, cls.tables.people
+
+ mapper(Person, people,
+ polymorphic_on=people.c.type,
+ polymorphic_identity='person')
+
+ mapper(Engineer, engineers,
+ inherits=Person,
+ inherit_condition=engineers.c.person_id == people.c.person_id,
+ polymorphic_identity='engineer',
+ properties={
+ 'reports_to':relationship(
+ Person,
+ primaryjoin=
+ people.c.person_id == engineers.c.reports_to_id)})
+
+ def test_has(self):
+ p1 = Person(name='dogbert')
+ e1 = Engineer(name='dilbert', primary_language='java', reports_to=p1)
+ sess = create_session()
+ sess.add(p1)
+ sess.add(e1)
+ sess.flush()
+ sess.expunge_all()
+ eq_(sess.query(Engineer)
+ .filter(Engineer.reports_to.has(Person.name == 'dogbert'))
+ .first(),
+ Engineer(name='dilbert'))
+
+ def test_oftype_aliases_in_exists(self):
+ e1 = Engineer(name='dilbert', primary_language='java')
+ e2 = Engineer(name='wally', primary_language='c++', reports_to=e1)
+ sess = create_session()
+ sess.add_all([e1, e2])
+ sess.flush()
+ eq_(sess.query(Engineer)
+ .filter(Engineer.reports_to
+ .of_type(Engineer)
+ .has(Engineer.name == 'dilbert'))
+ .first(),
+ e2)
+
+ def test_join(self):
+ p1 = Person(name='dogbert')
+ e1 = Engineer(name='dilbert', primary_language='java', reports_to=p1)
+ sess = create_session()
+ sess.add(p1)
+ sess.add(e1)
+ sess.flush()
+ sess.expunge_all()
+ eq_(sess.query(Engineer)
+ .join('reports_to', aliased=True)
+ .filter(Person.name == 'dogbert').first(),
+ Engineer(name='dilbert'))
+
+class SelfReferentialJ2JTest(fixtures.MappedTest):
+
+ run_setup_mappers = 'once'
+
+ @classmethod
+ def define_tables(cls, metadata):
+ people = Table('people', metadata,
+ Column('person_id', Integer,
+ primary_key=True,
+ test_needs_autoincrement=True),
+ Column('name', String(50)),
+ Column('type', String(30)))
+
+ engineers = Table('engineers', metadata,
+ Column('person_id', Integer,
+ ForeignKey('people.person_id'),
+ primary_key=True),
+ Column('primary_language', String(50)),
+ Column('reports_to_id', Integer,
+ ForeignKey('managers.person_id'))
+ )
+
+ managers = Table('managers', metadata,
+ Column('person_id', Integer, ForeignKey('people.person_id'),
+ primary_key=True),
+ )
+
+ @classmethod
+ def setup_mappers(cls):
+ engineers = cls.tables.engineers
+ managers = cls.tables.managers
+ people = cls.tables.people
+
+ mapper(Person, people,
+ polymorphic_on=people.c.type,
+ polymorphic_identity='person')
+
+ mapper(Manager, managers,
+ inherits=Person,
+ polymorphic_identity='manager')
+
+ mapper(Engineer, engineers,
+ inherits=Person,
+ polymorphic_identity='engineer',
+ properties={
+ 'reports_to':relationship(
+ Manager,
+ primaryjoin=
+ managers.c.person_id == engineers.c.reports_to_id,
+ backref='engineers')})
+
+ def test_has(self):
+ m1 = Manager(name='dogbert')
+ e1 = Engineer(name='dilbert', primary_language='java', reports_to=m1)
+ sess = create_session()
+ sess.add(m1)
+ sess.add(e1)
+ sess.flush()
+ sess.expunge_all()
+
+ eq_(sess.query(Engineer)
+ .filter(Engineer.reports_to.has(Manager.name == 'dogbert'))
+ .first(),
+ Engineer(name='dilbert'))
+
+ def test_join(self):
+ m1 = Manager(name='dogbert')
+ e1 = Engineer(name='dilbert', primary_language='java', reports_to=m1)
+ sess = create_session()
+ sess.add(m1)
+ sess.add(e1)
+ sess.flush()
+ sess.expunge_all()
+
+ eq_(sess.query(Engineer)
+ .join('reports_to', aliased=True)
+ .filter(Manager.name == 'dogbert').first(),
+ Engineer(name='dilbert'))
+
+ def test_filter_aliasing(self):
+ m1 = Manager(name='dogbert')
+ m2 = Manager(name='foo')
+ e1 = Engineer(name='wally', primary_language='java', reports_to=m1)
+ e2 = Engineer(name='dilbert', primary_language='c++', reports_to=m2)
+ e3 = Engineer(name='etc', primary_language='c++')
+
+ sess = create_session()
+ sess.add_all([m1, m2, e1, e2, e3])
+ sess.flush()
+ sess.expunge_all()
+
+ # filter aliasing applied to Engineer doesn't whack Manager
+ eq_(sess.query(Manager)
+ .join(Manager.engineers)
+ .filter(Manager.name == 'dogbert').all(),
+ [m1])
+
+ eq_(sess.query(Manager)
+ .join(Manager.engineers)
+ .filter(Engineer.name == 'dilbert').all(),
+ [m2])
+
+ eq_(sess.query(Manager, Engineer)
+ .join(Manager.engineers)
+ .order_by(Manager.name.desc()).all(),
+ [(m2, e2), (m1, e1)])
+
+ def test_relationship_compare(self):
+ m1 = Manager(name='dogbert')
+ m2 = Manager(name='foo')
+ e1 = Engineer(name='dilbert', primary_language='java', reports_to=m1)
+ e2 = Engineer(name='wally', primary_language='c++', reports_to=m2)
+ e3 = Engineer(name='etc', primary_language='c++')
+
+ sess = create_session()
+ sess.add(m1)
+ sess.add(m2)
+ sess.add(e1)
+ sess.add(e2)
+ sess.add(e3)
+ sess.flush()
+ sess.expunge_all()
+
+ eq_(sess.query(Manager)
+ .join(Manager.engineers)
+ .filter(Engineer.reports_to == None).all(),
+ [])
+
+ eq_(sess.query(Manager)
+ .join(Manager.engineers)
+ .filter(Engineer.reports_to == m1).all(),
+ [m1])
+
+class SelfReferentialJ2JSelfTest(fixtures.MappedTest):
+
+ run_setup_mappers = 'once'
+
+ @classmethod
+ def define_tables(cls, metadata):
+ people = Table('people', metadata,
+ Column('person_id', Integer,
+ primary_key=True,
+ test_needs_autoincrement=True),
+ Column('name', String(50)),
+ Column('type', String(30)))
+
+ engineers = Table('engineers', metadata,
+ Column('person_id', Integer,
+ ForeignKey('people.person_id'),
+ primary_key=True),
+ Column('reports_to_id', Integer,
+ ForeignKey('engineers.person_id')))
+
+ @classmethod
+ def setup_mappers(cls):
+ engineers = cls.tables.engineers
+ people = cls.tables.people
+
+ mapper(Person, people,
+ polymorphic_on=people.c.type,
+ polymorphic_identity='person')
+
+ mapper(Engineer, engineers,
+ inherits=Person,
+ polymorphic_identity='engineer',
+ properties={
+ 'reports_to':relationship(
+ Engineer,
+ primaryjoin=
+ engineers.c.person_id == engineers.c.reports_to_id,
+ backref='engineers',
+ remote_side=engineers.c.person_id)})
+
+ def _two_obj_fixture(self):
+ e1 = Engineer(name='wally')
+ e2 = Engineer(name='dilbert', reports_to=e1)
+ sess = Session()
+ sess.add_all([e1, e2])
+ sess.commit()
+ return sess
+
+ def _five_obj_fixture(self):
+ sess = Session()
+ e1, e2, e3, e4, e5 = [
+ Engineer(name='e%d' % (i + 1)) for i in xrange(5)
+ ]
+ e3.reports_to = e1
+ e4.reports_to = e2
+ sess.add_all([e1, e2, e3, e4, e5])
+ sess.commit()
+ return sess
+
+ def test_has(self):
+ sess = self._two_obj_fixture()
+ eq_(sess.query(Engineer)
+ .filter(Engineer.reports_to.has(Engineer.name == 'wally'))
+ .first(),
+ Engineer(name='dilbert'))
+
+ def test_join_explicit_alias(self):
+ sess = self._five_obj_fixture()
+ ea = aliased(Engineer)
+ eq_(sess.query(Engineer)
+ .join(ea, Engineer.engineers)
+ .filter(Engineer.name == 'e1').all(),
+ [Engineer(name='e1')])
+
+ def test_join_aliased_flag_one(self):
+ sess = self._two_obj_fixture()
+ eq_(sess.query(Engineer)
+ .join('reports_to', aliased=True)
+ .filter(Engineer.name == 'wally').first(),
+ Engineer(name='dilbert'))
+
+ def test_join_aliased_flag_two(self):
+ sess = self._five_obj_fixture()
+ eq_(sess.query(Engineer)
+ .join(Engineer.engineers, aliased=True)
+ .filter(Engineer.name == 'e4').all(),
+ [Engineer(name='e2')])
+
+ def test_relationship_compare(self):
+ sess = self._five_obj_fixture()
+ e1 = sess.query(Engineer).filter_by(name='e1').one()
+
+ eq_(sess.query(Engineer)
+ .join(Engineer.engineers, aliased=True)
+ .filter(Engineer.reports_to == None).all(),
+ [])
+
+ eq_(sess.query(Engineer)
+ .join(Engineer.engineers, aliased=True)
+ .filter(Engineer.reports_to == e1).all(),
+ [e1])
+
+class M2MFilterTest(fixtures.MappedTest):
+
+ run_setup_mappers = 'once'
+ run_inserts = 'once'
+ run_deletes = None
+
+ @classmethod
+ def define_tables(cls, metadata):
+ organizations = Table('organizations', metadata,
+ Column('id', Integer,
+ primary_key=True,
+ test_needs_autoincrement=True),
+ Column('name', String(50)))
+
+ engineers_to_org = Table('engineers_to_org', metadata,
+ Column('org_id', Integer,
+ ForeignKey('organizations.id')),
+ Column('engineer_id', Integer,
+ ForeignKey('engineers.person_id')))
+
+ people = Table('people', metadata,
+ Column('person_id', Integer,
+ primary_key=True,
+ test_needs_autoincrement=True),
+ Column('name', String(50)),
+ Column('type', String(30)))
+
+ engineers = Table('engineers', metadata,
+ Column('person_id', Integer,
+ ForeignKey('people.person_id'),
+ primary_key=True),
+ Column('primary_language', String(50)))
+
+ @classmethod
+ def setup_mappers(cls):
+ organizations = cls.tables.organizations
+ people = cls.tables.people
+ engineers = cls.tables.engineers
+ engineers_to_org = cls.tables.engineers_to_org
+
+ class Organization(cls.Comparable):
+ pass
+
+ mapper(Organization, organizations,
+ properties={
+ 'engineers':relationship(
+ Engineer,
+ secondary=engineers_to_org,
+ backref='organizations')})
+
+ mapper(Person, people,
+ polymorphic_on=people.c.type,
+ polymorphic_identity='person')
+
+ mapper(Engineer, engineers,
+ inherits=Person,
+ polymorphic_identity='engineer')
+
+ @classmethod
+ def insert_data(cls):
+ Organization = cls.classes.Organization
+ e1 = Engineer(name='e1')
+ e2 = Engineer(name='e2')
+ e3 = Engineer(name='e3')
+ e4 = Engineer(name='e4')
+ org1 = Organization(name='org1', engineers=[e1, e2])
+ org2 = Organization(name='org2', engineers=[e3, e4])
+ sess = create_session()
+ sess.add(org1)
+ sess.add(org2)
+ sess.flush()
+
+ def test_not_contains(self):
+ Organization = self.classes.Organization
+ sess = create_session()
+ e1 = sess.query(Person).filter(Engineer.name == 'e1').one()
+
+ # this works
+ eq_(sess.query(Organization)
+ .filter(~Organization.engineers
+ .of_type(Engineer)
+ .contains(e1))
+ .all(),
+ [Organization(name='org2')])
+
+ # this had a bug
+ eq_(sess.query(Organization)
+ .filter(~Organization.engineers
+ .contains(e1))
+ .all(),
+ [Organization(name='org2')])
+
+ def test_any(self):
+ sess = create_session()
+ Organization = self.classes.Organization
+
+ eq_(sess.query(Organization)
+ .filter(Organization.engineers
+ .of_type(Engineer)
+ .any(Engineer.name == 'e1'))
+ .all(),
+ [Organization(name='org1')])
+
+ eq_(sess.query(Organization)
+ .filter(Organization.engineers
+ .any(Engineer.name == 'e1'))
+ .all(),
+ [Organization(name='org1')])
+
+class SelfReferentialM2MTest(fixtures.MappedTest, AssertsCompiledSQL):
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('secondary', metadata,
+ Column('left_id', Integer,
+ ForeignKey('parent.id'),
+ nullable=False),
+ Column('right_id', Integer,
+ ForeignKey('parent.id'),
+ nullable=False))
+
+ Table('parent', metadata,
+ Column('id', Integer,
+ primary_key=True,
+ test_needs_autoincrement=True),
+ Column('cls', String(50)))
+
+ Table('child1', metadata,
+ Column('id', Integer,
+ ForeignKey('parent.id'),
+ primary_key=True))
+
+ Table('child2', metadata,
+ Column('id', Integer,
+ ForeignKey('parent.id'),
+ primary_key=True))
+
+ @classmethod
+ def setup_classes(cls):
+ class Parent(cls.Basic):
+ pass
+ class Child1(Parent):
+ pass
+ class Child2(Parent):
+ pass
+
+ @classmethod
+ def setup_mappers(cls):
+ child1 = cls.tables.child1
+ child2 = cls.tables.child2
+ Parent = cls.classes.Parent
+ parent = cls.tables.parent
+ Child1 = cls.classes.Child1
+ Child2 = cls.classes.Child2
+ secondary = cls.tables.secondary
+
+ mapper(Parent, parent,
+ polymorphic_on=parent.c.cls)
+
+ mapper(Child1, child1,
+ inherits=Parent,
+ polymorphic_identity='child1',
+ properties={
+ 'left_child2':relationship(
+ Child2,
+ secondary=secondary,
+ primaryjoin=parent.c.id == secondary.c.right_id,
+ secondaryjoin=parent.c.id == secondary.c.left_id,
+ uselist=False,
+ backref="right_children")})
+
+ mapper(Child2, child2,
+ inherits=Parent,
+ polymorphic_identity='child2')
+
+ def test_query_crit(self):
+ Child1, Child2 = self.classes.Child1, self.classes.Child2
+ sess = create_session()
+ c11, c12, c13 = Child1(), Child1(), Child1()
+ c21, c22, c23 = Child2(), Child2(), Child2()
+ c11.left_child2 = c22
+ c12.left_child2 = c22
+ c13.left_child2 = c23
+ sess.add_all([c11, c12, c13, c21, c22, c23])
+ sess.flush()
+
+ # test that the join to Child2 doesn't alias Child1 in the select
+ eq_(set(sess.query(Child1).join(Child1.left_child2)),
+ set([c11, c12, c13]))
+
+ eq_(set(sess.query(Child1, Child2).join(Child1.left_child2)),
+ set([(c11, c22), (c12, c22), (c13, c23)]))
+
+ # test __eq__() on property is annotating correctly
+ eq_(set(sess.query(Child2)
+ .join(Child2.right_children)
+ .filter(Child1.left_child2 == c22)),
+ set([c22]))
+
+ # test the same again
+ self.assert_compile(
+ sess.query(Child2)
+ .join(Child2.right_children)
+ .filter(Child1.left_child2 == c22)
+ .with_labels().statement,
+ "SELECT child2.id AS child2_id, parent.id AS parent_id, "
+ "parent.cls AS parent_cls FROM secondary AS secondary_1, "
+ "parent JOIN child2 ON parent.id = child2.id JOIN secondary AS "
+ "secondary_2 ON parent.id = secondary_2.left_id JOIN (SELECT "
+ "parent.id AS parent_id, parent.cls AS parent_cls, child1.id AS "
+ "child1_id FROM parent JOIN child1 ON parent.id = child1.id) AS "
+ "anon_1 ON anon_1.parent_id = secondary_2.right_id WHERE "
+ "anon_1.parent_id = secondary_1.right_id AND :param_1 = "
+ "secondary_1.left_id",
+ dialect=default.DefaultDialect()
+ )
+
+ def test_eager_join(self):
+ Child1, Child2 = self.classes.Child1, self.classes.Child2
+ sess = create_session()
+ c1 = Child1()
+ c1.left_child2 = Child2()
+ sess.add(c1)
+ sess.flush()
+
+ # test that the splicing of the join works here, doesn't break in
+ # the middle of "parent join child1"
+ q = sess.query(Child1).options(joinedload('left_child2'))
+ self.assert_compile(q.limit(1).with_labels().statement,
+ "SELECT anon_1.child1_id AS anon_1_child1_id, anon_1.parent_id "
+ "AS anon_1_parent_id, anon_1.parent_cls AS anon_1_parent_cls, "
+ "anon_2.child2_id AS anon_2_child2_id, anon_2.parent_id AS "
+ "anon_2_parent_id, anon_2.parent_cls AS anon_2_parent_cls FROM "
+ "(SELECT child1.id AS child1_id, parent.id AS parent_id, "
+ "parent.cls AS parent_cls FROM parent JOIN child1 ON parent.id = "
+ "child1.id LIMIT :param_1) AS anon_1 LEFT OUTER JOIN secondary "
+ "AS secondary_1 ON anon_1.parent_id = secondary_1.right_id LEFT "
+ "OUTER JOIN (SELECT parent.id AS parent_id, parent.cls AS "
+ "parent_cls, child2.id AS child2_id FROM parent JOIN child2 ON "
+ "parent.id = child2.id) AS anon_2 ON anon_2.parent_id = "
+ "secondary_1.left_id",
+ {'param_1':1},
+ dialect=default.DefaultDialect())
+
+ # another way to check
+ assert q.limit(1).with_labels().subquery().count().scalar() == 1
+ assert q.first() is c1
+
+ def test_subquery_load(self):
+ Child1, Child2 = self.classes.Child1, self.classes.Child2
+ sess = create_session()
+ c1 = Child1()
+ c1.left_child2 = Child2()
+ sess.add(c1)
+ sess.flush()
+ sess.expunge_all()
+
+ query_ = sess.query(Child1).options(subqueryload('left_child2'))
+ for row in query_.all():
+ assert row.left_child2
+
+class EagerToSubclassTest(fixtures.MappedTest):
+ """Test eager loads to subclass mappers"""
+
+ run_setup_classes = 'once'
+ run_setup_mappers = 'once'
+ run_inserts = 'once'
+ run_deletes = None
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('parent', metadata,
+ Column('id', Integer,
+ primary_key=True,
+ test_needs_autoincrement=True),
+ Column('data', String(10)))
+
+ Table('base', metadata,
+ Column('id', Integer,
+ primary_key=True,
+ test_needs_autoincrement=True),
+ Column('type', String(10)),
+ Column('related_id', Integer,
+ ForeignKey('related.id')))
+
+ Table('sub', metadata,
+ Column('id', Integer,
+ ForeignKey('base.id'),
+ primary_key=True),
+ Column('data', String(10)),
+ Column('parent_id', Integer,
+ ForeignKey('parent.id'),
+ nullable=False))
+
+ Table('related', metadata,
+ Column('id', Integer,
+ primary_key=True,
+ test_needs_autoincrement=True),
+ Column('data', String(10)))
+
+ @classmethod
+ def setup_classes(cls):
+ class Parent(cls.Comparable):
+ pass
+ class Base(cls.Comparable):
+ pass
+ class Sub(Base):
+ pass
+ class Related(cls.Comparable):
+ pass
+
+ @classmethod
+ def setup_mappers(cls):
+ sub = cls.tables.sub
+ Sub = cls.classes.Sub
+ base = cls.tables.base
+ Base = cls.classes.Base
+ parent = cls.tables.parent
+ Parent = cls.classes.Parent
+ related = cls.tables.related
+ Related = cls.classes.Related
+
+ mapper(Parent, parent,
+ properties={'children':relationship(Sub, order_by=sub.c.data)})
+
+ mapper(Base, base,
+ polymorphic_on=base.c.type,
+ polymorphic_identity='b',
+ properties={'related':relationship(Related)})
+
+ mapper(Sub, sub,
+ inherits=Base,
+ polymorphic_identity='s')
+
+ mapper(Related, related)
+
+ @classmethod
+ def insert_data(cls):
+ global p1, p2
+
+ Parent = cls.classes.Parent
+ Sub = cls.classes.Sub
+ Related = cls.classes.Related
+ sess = Session()
+ r1, r2 = Related(data='r1'), Related(data='r2')
+ s1 = Sub(data='s1', related=r1)
+ s2 = Sub(data='s2', related=r2)
+ s3 = Sub(data='s3')
+ s4 = Sub(data='s4', related=r2)
+ s5 = Sub(data='s5')
+ p1 = Parent(data='p1', children=[s1, s2, s3])
+ p2 = Parent(data='p2', children=[s4, s5])
+ sess.add(p1)
+ sess.add(p2)
+ sess.commit()
+
+ def test_joinedload(self):
+ Parent = self.classes.Parent
+ sess = Session()
+ def go():
+ eq_(sess.query(Parent)
+ .options(joinedload(Parent.children)).all(),
+ [p1, p2])
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_contains_eager(self):
+ Parent = self.classes.Parent
+ Sub = self.classes.Sub
+ sess = Session()
+ def go():
+ eq_(sess.query(Parent)
+ .join(Parent.children)
+ .options(contains_eager(Parent.children))
+ .order_by(Parent.data, Sub.data).all(),
+ [p1, p2])
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_subq_through_related(self):
+ Parent = self.classes.Parent
+ Sub = self.classes.Sub
+ sess = Session()
+ def go():
+ eq_(sess.query(Parent)
+ .options(subqueryload_all(Parent.children, Sub.related))
+ .order_by(Parent.data).all(),
+ [p1, p2])
+ self.assert_sql_count(testing.db, go, 3)
+
+class SubClassEagerToSubClassTest(fixtures.MappedTest):
+ """Test joinedloads from subclass to subclass mappers"""
+
+ run_setup_classes = 'once'
+ run_setup_mappers = 'once'
+ run_inserts = 'once'
+ run_deletes = None
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('parent', metadata,
+ Column('id', Integer,
+ primary_key=True,
+ test_needs_autoincrement=True),
+ Column('type', String(10)),
+ )
+
+ Table('subparent', metadata,
+ Column('id', Integer,
+ ForeignKey('parent.id'),
+ primary_key=True),
+ Column('data', String(10)),
+ )
+
+ Table('base', metadata,
+ Column('id', Integer,
+ primary_key=True,
+ test_needs_autoincrement=True),
+ Column('type', String(10)),
+ )
+
+ Table('sub', metadata,
+ Column('id', Integer,
+ ForeignKey('base.id'),
+ primary_key=True),
+ Column('data', String(10)),
+ Column('subparent_id', Integer,
+ ForeignKey('subparent.id'),
+ nullable=False)
+ )
+
+ @classmethod
+ def setup_classes(cls):
+ class Parent(cls.Comparable):
+ pass
+ class Subparent(Parent):
+ pass
+ class Base(cls.Comparable):
+ pass
+ class Sub(Base):
+ pass
+
+ @classmethod
+ def setup_mappers(cls):
+ sub = cls.tables.sub
+ Sub = cls.classes.Sub
+ base = cls.tables.base
+ Base = cls.classes.Base
+ parent = cls.tables.parent
+ Parent = cls.classes.Parent
+ subparent = cls.tables.subparent
+ Subparent = cls.classes.Subparent
+
+ mapper(Parent, parent,
+ polymorphic_on=parent.c.type,
+ polymorphic_identity='b')
+
+ mapper(Subparent, subparent,
+ inherits=Parent,
+ polymorphic_identity='s',
+ properties={
+ 'children':relationship(Sub, order_by=base.c.id)})
+
+ mapper(Base, base,
+ polymorphic_on=base.c.type,
+ polymorphic_identity='b')
+
+ mapper(Sub, sub,
+ inherits=Base,
+ polymorphic_identity='s')
+
+ @classmethod
+ def insert_data(cls):
+ global p1, p2
+
+ Sub, Subparent = cls.classes.Sub, cls.classes.Subparent
+ sess = create_session()
+ p1 = Subparent(
+ data='p1',
+ children=[Sub(data='s1'), Sub(data='s2'), Sub(data='s3')])
+ p2 = Subparent(
+ data='p2',
+ children=[Sub(data='s4'), Sub(data='s5')])
+ sess.add(p1)
+ sess.add(p2)
+ sess.flush()
+
+ def test_joinedload(self):
+ Subparent = self.classes.Subparent
+
+ sess = create_session()
+ def go():
+ eq_(sess.query(Subparent)
+ .options(joinedload(Subparent.children)).all(),
+ [p1, p2])
+ self.assert_sql_count(testing.db, go, 1)
+
+ sess.expunge_all()
+ def go():
+ eq_(sess.query(Subparent)
+ .options(joinedload("children")).all(),
+ [p1, p2])
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_contains_eager(self):
+ Subparent = self.classes.Subparent
+
+ sess = create_session()
+ def go():
+ eq_(sess.query(Subparent)
+ .join(Subparent.children)
+ .options(contains_eager(Subparent.children)).all(),
+ [p1, p2])
+ self.assert_sql_count(testing.db, go, 1)
+
+ sess.expunge_all()
+ def go():
+ eq_(sess.query(Subparent)
+ .join(Subparent.children)
+ .options(contains_eager("children")).all(),
+ [p1, p2])
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_subqueryload(self):
+ Subparent = self.classes.Subparent
+
+ sess = create_session()
+ def go():
+ eq_(sess.query(Subparent)
+ .options(subqueryload(Subparent.children)).all(),
+ [p1, p2])
+ self.assert_sql_count(testing.db, go, 2)
+
+ sess.expunge_all()
+ def go():
+ eq_(sess.query(Subparent)
+ .options(subqueryload("children")).all(),
+ [p1, p2])
+ self.assert_sql_count(testing.db, go, 2)
+