From 8f788f683ba64e7e0c495454883ca18dd2ae5f11 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sun, 18 Mar 2007 21:45:55 +0000 Subject: [PATCH] - check for tables in the primaryjoin/secondaryjoin that arent parent of parent/child mappers. dont include those clauses when looking for foreign_keys (which also takes care of remote_side). if those cols are present in foreign_keys, lazyloader makes binds out of them and tries to target those columns on the mapper, raising either the "conflicting column" error if they have the same name, or the "cant find column on mapping" if it has a unique name. added tests for both. --- CHANGES | 4 ++ lib/sqlalchemy/orm/properties.py | 18 ++++++ test/orm/relationships.py | 104 ++++++++++++++++++++++++++++++- 3 files changed, 125 insertions(+), 1 deletion(-) diff --git a/CHANGES b/CHANGES index 6b39299e19..2c73d0ab3a 100644 --- a/CHANGES +++ b/CHANGES @@ -125,6 +125,10 @@ [ticket:493]. also fixes to detection of "direction", more specific targeting of columns that belong to the polymorphic union vs. those that dont. + + - some fixes to relationship calcs when using "view_only=True" to pull + in other tables into the join condition which arent parent of the + relationship's parent/child mappings - flush fixes on self-referential relationships that contain references to other instances outside of the cyclical chain, when the initial diff --git a/lib/sqlalchemy/orm/properties.py b/lib/sqlalchemy/orm/properties.py index dbf58946fe..dc10a68552 100644 --- a/lib/sqlalchemy/orm/properties.py +++ b/lib/sqlalchemy/orm/properties.py @@ -242,6 +242,16 @@ class PropertyLoader(StrategizedProperty): def _determine_fks(self): if len(self._legacy_foreignkey) and not self._is_self_referential(): self.foreign_keys = self._legacy_foreignkey + + def col_is_part_of_mappings(col): + if self.secondary is None: + return self.parent.unjoined_table.corresponding_column(col, raiseerr=False) is not None or \ + self.target.corresponding_column(col, raiseerr=False) is not None + else: + return self.parent.unjoined_table.corresponding_column(col, raiseerr=False) is not None or \ + self.target.corresponding_column(col, raiseerr=False) is not None or \ + self.secondary.corresponding_column(col, raiseerr=False) is not None + if len(self.foreign_keys): self._opposite_side = util.Set() def visit_binary(binary): @@ -260,6 +270,13 @@ class PropertyLoader(StrategizedProperty): def visit_binary(binary): if binary.operator != '=' or not isinstance(binary.left, schema.Column) or not isinstance(binary.right, schema.Column): return + + # this check is for when the user put the "view_only" flag on and has tables that have nothing + # to do with the relationship's parent/child mappings in the join conditions. we dont want cols + # or clauses related to those external tables dealt with. see orm.relationships.ViewOnlyTest + if not col_is_part_of_mappings(binary.left) or not col_is_part_of_mappings(binary.right): + return + for f in binary.left.foreign_keys: if f.references(binary.right.table): self.foreign_keys.add(binary.left) @@ -275,6 +292,7 @@ class PropertyLoader(StrategizedProperty): if self.secondaryjoin is not None: mapperutil.BinaryVisitor(visit_binary).traverse(self.secondaryjoin) + def _determine_direction(self): """Determine our *direction*, i.e. do we represent one to many, many to many, etc. diff --git a/test/orm/relationships.py b/test/orm/relationships.py index 66ad3fae61..c456120555 100644 --- a/test/orm/relationships.py +++ b/test/orm/relationships.py @@ -672,6 +672,108 @@ class TypeMatchTest(testbase.ORMTest): assert False except exceptions.AssertionError, err: assert str(err) == "Attribute 'a' on class '%s' doesn't handle objects of type '%s'" % (D, B) - + +class ViewOnlyTest(testbase.ORMTest): + """test a view_only mapping where a third table is pulled into the primary join condition, + using overlapping PK column names (should not produce "conflicting column" error)""" + def define_tables(self, metadata): + global t1, t2, t3 + t1 = Table("t1", metadata, + Column('id', Integer, primary_key=True), + Column('data', String(40))) + t2 = Table("t2", metadata, + Column('id', Integer, primary_key=True), + Column('data', String(40)), + Column('t1id', Integer, ForeignKey('t1.id'))) + t3 = Table("t3", metadata, + Column('id', Integer, primary_key=True), + Column('data', String(40)), + Column('t2id', Integer, ForeignKey('t2.id')) + ) + + def test_basic(self): + class C1(object):pass + class C2(object):pass + class C3(object):pass + + mapper(C1, t1, properties={ + 't2s':relation(C2), + 't2_view':relation(C2, viewonly=True, primaryjoin=and_(t1.c.id==t2.c.t1id, t3.c.t2id==t2.c.id, t3.c.data==t1.c.data)) + }) + mapper(C2, t2) + mapper(C3, t3, properties={ + 't2':relation(C2) + }) + + c1 = C1() + c1.data = 'c1data' + c2a = C2() + c1.t2s.append(c2a) + c2b = C2() + c1.t2s.append(c2b) + c3 = C3() + c3.data='c1data' + c3.t2 = c2b + sess = create_session() + sess.save(c1) + sess.save(c3) + sess.flush() + sess.clear() + + c1 = sess.query(C1).get(c1.id) + assert set([x.id for x in c1.t2s]) == set([c2a.id, c2b.id]) + assert set([x.id for x in c1.t2_view]) == set([c2b.id]) + +class ViewOnlyTest2(testbase.ORMTest): + """test a view_only mapping where a third table is pulled into the primary join condition, + using non-overlapping PK column names (should not produce "mapper has no column X" error)""" + def define_tables(self, metadata): + global t1, t2, t3 + t1 = Table("t1", metadata, + Column('t1id', Integer, primary_key=True), + Column('data', String(40))) + t2 = Table("t2", metadata, + Column('t2id', Integer, primary_key=True), + Column('data', String(40)), + Column('t1id_ref', Integer, ForeignKey('t1.t1id'))) + t3 = Table("t3", metadata, + Column('t3id', Integer, primary_key=True), + Column('data', String(40)), + Column('t2id_ref', Integer, ForeignKey('t2.t2id')) + ) + def test_basic(self): + class C1(object):pass + class C2(object):pass + class C3(object):pass + + mapper(C1, t1, properties={ + 't2s':relation(C2), + 't2_view':relation(C2, viewonly=True, primaryjoin=and_(t1.c.t1id==t2.c.t1id_ref, t3.c.t2id_ref==t2.c.t2id, t3.c.data==t1.c.data)) + }) + mapper(C2, t2) + mapper(C3, t3, properties={ + 't2':relation(C2) + }) + + c1 = C1() + c1.data = 'c1data' + c2a = C2() + c1.t2s.append(c2a) + c2b = C2() + c1.t2s.append(c2b) + c3 = C3() + c3.data='c1data' + c3.t2 = c2b + sess = create_session() + sess.save(c1) + sess.save(c3) + sess.flush() + sess.clear() + + c1 = sess.query(C1).get(c1.t1id) + assert set([x.t2id for x in c1.t2s]) == set([c2a.t2id, c2b.t2id]) + assert set([x.t2id for x in c1.t2_view]) == set([c2b.t2id]) + + if __name__ == "__main__": testbase.main() -- 2.47.2