]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- check for tables in the primaryjoin/secondaryjoin that arent parent of parent/child...
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 18 Mar 2007 21:45:55 +0000 (21:45 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 18 Mar 2007 21:45:55 +0000 (21:45 +0000)
dont include those clauses when looking for foreign_keys (which also takes care of remote_side).
if those cols are present in foreign_keys, lazyloader makes binds out of them and tries to
target those columns on the mapper, raising either the "conflicting column" error if they have the same
name, or the "cant find column on mapping" if it has a unique name.  added tests for both.

CHANGES
lib/sqlalchemy/orm/properties.py
test/orm/relationships.py

diff --git a/CHANGES b/CHANGES
index 6b39299e197728c55f2b2a9877e482021c4255b2..2c73d0ab3af1f4120f57fd44f57586df262d7675 100644 (file)
--- a/CHANGES
+++ b/CHANGES
       [ticket:493]. also fixes to detection of "direction", more specific
       targeting of columns that belong to the polymorphic union vs. those
       that dont.
+      
+    - some fixes to relationship calcs when using "view_only=True" to pull
+      in other tables into the join condition which arent parent of the
+      relationship's parent/child mappings
 
     - flush fixes on self-referential relationships that contain references
       to other instances outside of the cyclical chain, when the initial
index dbf58946fe942ce18e52ac2647a9775659ebcbc3..dc10a68552c4f8dfcf3d224f65856bcc5ffb3a03 100644 (file)
@@ -242,6 +242,16 @@ class PropertyLoader(StrategizedProperty):
     def _determine_fks(self):
         if len(self._legacy_foreignkey) and not self._is_self_referential():
             self.foreign_keys = self._legacy_foreignkey
+
+        def col_is_part_of_mappings(col):
+            if self.secondary is None:
+                return self.parent.unjoined_table.corresponding_column(col, raiseerr=False) is not None or \
+                    self.target.corresponding_column(col, raiseerr=False) is not None
+            else:
+                return self.parent.unjoined_table.corresponding_column(col, raiseerr=False) is not None or \
+                    self.target.corresponding_column(col, raiseerr=False) is not None or \
+                    self.secondary.corresponding_column(col, raiseerr=False) is not None
+
         if len(self.foreign_keys):
             self._opposite_side = util.Set()
             def visit_binary(binary):
@@ -260,6 +270,13 @@ class PropertyLoader(StrategizedProperty):
             def visit_binary(binary):
                 if binary.operator != '=' or not isinstance(binary.left, schema.Column) or not isinstance(binary.right, schema.Column):
                     return
+
+                # this check is for when the user put the "view_only" flag on and has tables that have nothing
+                # to do with the relationship's parent/child mappings in the join conditions.  we dont want cols
+                # or clauses related to those external tables dealt with.  see orm.relationships.ViewOnlyTest
+                if not col_is_part_of_mappings(binary.left) or not col_is_part_of_mappings(binary.right):
+                    return
+                        
                 for f in binary.left.foreign_keys:
                     if f.references(binary.right.table):
                         self.foreign_keys.add(binary.left)
@@ -275,6 +292,7 @@ class PropertyLoader(StrategizedProperty):
             if self.secondaryjoin is not None:
                 mapperutil.BinaryVisitor(visit_binary).traverse(self.secondaryjoin)
 
+
     def _determine_direction(self):
         """Determine our *direction*, i.e. do we represent one to
         many, many to many, etc.
index 66ad3fae61f65540b4fa5c03c520fefc7515c2a7..c456120555dfba898d981fc750691c2d5e3398e2 100644 (file)
@@ -672,6 +672,108 @@ class TypeMatchTest(testbase.ORMTest):
             assert False
         except exceptions.AssertionError, err:
             assert str(err) == "Attribute 'a' on class '%s' doesn't handle objects of type '%s'" % (D, B)
-                
+
+class ViewOnlyTest(testbase.ORMTest):
+    """test a view_only mapping where a third table is pulled into the primary join condition,
+    using overlapping PK column names (should not produce "conflicting column" error)"""
+    def define_tables(self, metadata):
+        global t1, t2, t3
+        t1 = Table("t1", metadata,
+            Column('id', Integer, primary_key=True),
+            Column('data', String(40)))
+        t2 = Table("t2", metadata,
+            Column('id', Integer, primary_key=True),
+            Column('data', String(40)),
+            Column('t1id', Integer, ForeignKey('t1.id')))
+        t3 = Table("t3", metadata,
+            Column('id', Integer, primary_key=True),
+            Column('data', String(40)),
+            Column('t2id', Integer, ForeignKey('t2.id'))
+            )
+            
+    def test_basic(self):
+        class C1(object):pass
+        class C2(object):pass
+        class C3(object):pass
+        
+        mapper(C1, t1, properties={
+            't2s':relation(C2),
+            't2_view':relation(C2, viewonly=True, primaryjoin=and_(t1.c.id==t2.c.t1id, t3.c.t2id==t2.c.id, t3.c.data==t1.c.data))
+        })
+        mapper(C2, t2)
+        mapper(C3, t3, properties={
+            't2':relation(C2)
+        })
+        
+        c1 = C1()
+        c1.data = 'c1data'
+        c2a = C2()
+        c1.t2s.append(c2a)
+        c2b = C2()
+        c1.t2s.append(c2b)
+        c3 = C3()
+        c3.data='c1data'
+        c3.t2 = c2b
+        sess = create_session()
+        sess.save(c1)
+        sess.save(c3)
+        sess.flush()
+        sess.clear()
+        
+        c1 = sess.query(C1).get(c1.id)
+        assert set([x.id for x in c1.t2s]) == set([c2a.id, c2b.id])
+        assert set([x.id for x in c1.t2_view]) == set([c2b.id])
+
+class ViewOnlyTest2(testbase.ORMTest):
+    """test a view_only mapping where a third table is pulled into the primary join condition,
+    using non-overlapping PK column names (should not produce "mapper has no column X" error)"""
+    def define_tables(self, metadata):
+        global t1, t2, t3
+        t1 = Table("t1", metadata,
+            Column('t1id', Integer, primary_key=True),
+            Column('data', String(40)))
+        t2 = Table("t2", metadata,
+            Column('t2id', Integer, primary_key=True),
+            Column('data', String(40)),
+            Column('t1id_ref', Integer, ForeignKey('t1.t1id')))
+        t3 = Table("t3", metadata,
+            Column('t3id', Integer, primary_key=True),
+            Column('data', String(40)),
+            Column('t2id_ref', Integer, ForeignKey('t2.t2id'))
+            )
+    def test_basic(self):
+        class C1(object):pass
+        class C2(object):pass
+        class C3(object):pass
+
+        mapper(C1, t1, properties={
+            't2s':relation(C2),
+            't2_view':relation(C2, viewonly=True, primaryjoin=and_(t1.c.t1id==t2.c.t1id_ref, t3.c.t2id_ref==t2.c.t2id, t3.c.data==t1.c.data))
+        })
+        mapper(C2, t2)
+        mapper(C3, t3, properties={
+            't2':relation(C2)
+        })
+
+        c1 = C1()
+        c1.data = 'c1data'
+        c2a = C2()
+        c1.t2s.append(c2a)
+        c2b = C2()
+        c1.t2s.append(c2b)
+        c3 = C3()
+        c3.data='c1data'
+        c3.t2 = c2b
+        sess = create_session()
+        sess.save(c1)
+        sess.save(c3)
+        sess.flush()
+        sess.clear()
+
+        c1 = sess.query(C1).get(c1.t1id)
+        assert set([x.t2id for x in c1.t2s]) == set([c2a.t2id, c2b.t2id])
+        assert set([x.t2id for x in c1.t2_view]) == set([c2b.t2id])
+        
+        
 if __name__ == "__main__":
     testbase.main()