def _determine_fks(self):
if len(self._legacy_foreignkey) and not self._is_self_referential():
self.foreign_keys = self._legacy_foreignkey
+
+ def col_is_part_of_mappings(col):
+ if self.secondary is None:
+ return self.parent.unjoined_table.corresponding_column(col, raiseerr=False) is not None or \
+ self.target.corresponding_column(col, raiseerr=False) is not None
+ else:
+ return self.parent.unjoined_table.corresponding_column(col, raiseerr=False) is not None or \
+ self.target.corresponding_column(col, raiseerr=False) is not None or \
+ self.secondary.corresponding_column(col, raiseerr=False) is not None
+
if len(self.foreign_keys):
self._opposite_side = util.Set()
def visit_binary(binary):
def visit_binary(binary):
if binary.operator != '=' or not isinstance(binary.left, schema.Column) or not isinstance(binary.right, schema.Column):
return
+
+ # this check is for when the user put the "view_only" flag on and has tables that have nothing
+ # to do with the relationship's parent/child mappings in the join conditions. we dont want cols
+ # or clauses related to those external tables dealt with. see orm.relationships.ViewOnlyTest
+ if not col_is_part_of_mappings(binary.left) or not col_is_part_of_mappings(binary.right):
+ return
+
for f in binary.left.foreign_keys:
if f.references(binary.right.table):
self.foreign_keys.add(binary.left)
if self.secondaryjoin is not None:
mapperutil.BinaryVisitor(visit_binary).traverse(self.secondaryjoin)
+
def _determine_direction(self):
"""Determine our *direction*, i.e. do we represent one to
many, many to many, etc.
assert False
except exceptions.AssertionError, err:
assert str(err) == "Attribute 'a' on class '%s' doesn't handle objects of type '%s'" % (D, B)
-
+
+class ViewOnlyTest(testbase.ORMTest):
+ """test a view_only mapping where a third table is pulled into the primary join condition,
+ using overlapping PK column names (should not produce "conflicting column" error)"""
+ def define_tables(self, metadata):
+ global t1, t2, t3
+ t1 = Table("t1", metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(40)))
+ t2 = Table("t2", metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(40)),
+ Column('t1id', Integer, ForeignKey('t1.id')))
+ t3 = Table("t3", metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(40)),
+ Column('t2id', Integer, ForeignKey('t2.id'))
+ )
+
+ def test_basic(self):
+ class C1(object):pass
+ class C2(object):pass
+ class C3(object):pass
+
+ mapper(C1, t1, properties={
+ 't2s':relation(C2),
+ 't2_view':relation(C2, viewonly=True, primaryjoin=and_(t1.c.id==t2.c.t1id, t3.c.t2id==t2.c.id, t3.c.data==t1.c.data))
+ })
+ mapper(C2, t2)
+ mapper(C3, t3, properties={
+ 't2':relation(C2)
+ })
+
+ c1 = C1()
+ c1.data = 'c1data'
+ c2a = C2()
+ c1.t2s.append(c2a)
+ c2b = C2()
+ c1.t2s.append(c2b)
+ c3 = C3()
+ c3.data='c1data'
+ c3.t2 = c2b
+ sess = create_session()
+ sess.save(c1)
+ sess.save(c3)
+ sess.flush()
+ sess.clear()
+
+ c1 = sess.query(C1).get(c1.id)
+ assert set([x.id for x in c1.t2s]) == set([c2a.id, c2b.id])
+ assert set([x.id for x in c1.t2_view]) == set([c2b.id])
+
+class ViewOnlyTest2(testbase.ORMTest):
+ """test a view_only mapping where a third table is pulled into the primary join condition,
+ using non-overlapping PK column names (should not produce "mapper has no column X" error)"""
+ def define_tables(self, metadata):
+ global t1, t2, t3
+ t1 = Table("t1", metadata,
+ Column('t1id', Integer, primary_key=True),
+ Column('data', String(40)))
+ t2 = Table("t2", metadata,
+ Column('t2id', Integer, primary_key=True),
+ Column('data', String(40)),
+ Column('t1id_ref', Integer, ForeignKey('t1.t1id')))
+ t3 = Table("t3", metadata,
+ Column('t3id', Integer, primary_key=True),
+ Column('data', String(40)),
+ Column('t2id_ref', Integer, ForeignKey('t2.t2id'))
+ )
+ def test_basic(self):
+ class C1(object):pass
+ class C2(object):pass
+ class C3(object):pass
+
+ mapper(C1, t1, properties={
+ 't2s':relation(C2),
+ 't2_view':relation(C2, viewonly=True, primaryjoin=and_(t1.c.t1id==t2.c.t1id_ref, t3.c.t2id_ref==t2.c.t2id, t3.c.data==t1.c.data))
+ })
+ mapper(C2, t2)
+ mapper(C3, t3, properties={
+ 't2':relation(C2)
+ })
+
+ c1 = C1()
+ c1.data = 'c1data'
+ c2a = C2()
+ c1.t2s.append(c2a)
+ c2b = C2()
+ c1.t2s.append(c2b)
+ c3 = C3()
+ c3.data='c1data'
+ c3.t2 = c2b
+ sess = create_session()
+ sess.save(c1)
+ sess.save(c3)
+ sess.flush()
+ sess.clear()
+
+ c1 = sess.query(C1).get(c1.t1id)
+ assert set([x.t2id for x in c1.t2s]) == set([c2a.t2id, c2b.t2id])
+ assert set([x.t2id for x in c1.t2_view]) == set([c2b.t2id])
+
+
if __name__ == "__main__":
testbase.main()