]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- fixed bug where mapper, being linked to a join where one table had
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 10 Aug 2007 14:49:08 +0000 (14:49 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 10 Aug 2007 14:49:08 +0000 (14:49 +0000)
no PK columns, would not detect that the joined table had no PK.

CHANGES
lib/sqlalchemy/orm/mapper.py
test/orm/mapper.py
test/orm/unitofwork.py

diff --git a/CHANGES b/CHANGES
index 1d952c8d708737d592f3e7087d3f707cb1945ab3..7c2adea706be1736602d3d3267963d2a7293db24 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@
       different m2m tables.  this raises an error in 0.3 but is 
       possible in 0.4 when aliases are used. [ticket:687]
     - fixed small exception throw bug in Session.merge()
+    - fixed bug where mapper, being linked to a join where one table had 
+      no PK columns, would not detect that the joined table had no PK.
 - engine
     - fixed another occasional race condition which could occur
       when using pool with threadlocal setting
index 375408926fb897d958ee105e37414327f9d11de2..5c3d7acc29a2e90f2ee089c47ae1d0e42a7e9881 100644 (file)
@@ -1327,7 +1327,10 @@ class Mapper(object):
 
     def _has_pks(self, table):
         try:
-            for k in self.pks_by_table[table]:
+            pk = self.pks_by_table[table]
+            if not pk:
+                return False
+            for k in pk:
                 if not self.columntoproperty.has_key(k):
                     return False
             else:
index c0297e51492ffd2a75ca3469d6f60f00e62517c1..77776e73e867267a2dbe0405295e135352077259 100644 (file)
@@ -285,6 +285,30 @@ class MapperTest(MapperSuperTest):
         q = create_session().query(m)
         l = q.select()
         self.assert_result(l, User, *user_result[0:2])
+    
+    def testmappingtojoinnopk(self):
+        metadata = MetaData()
+        account_ids_table = Table('account_ids', metadata,
+                Column('account_id', Integer, primary_key=True),
+                Column('username', String(20)))
+        account_stuff_table = Table('account_stuff', metadata,
+                Column('account_id', Integer, ForeignKey('account_ids.account_id')),
+                Column('credit', Numeric))
+        class A(object):pass
+        m = mapper(A, account_ids_table.join(account_stuff_table))
+        m.compile()
+        assert m._has_pks(account_ids_table)
+        assert not m._has_pks(account_stuff_table)
+        metadata.create_all(testbase.db)
+        try:
+            sess = create_session(bind=testbase.db)
+            a = A()
+            sess.save(a)
+            sess.flush()
+            assert testbase.db.execute(account_ids_table.count()).scalar() == 1
+            assert testbase.db.execute(account_stuff_table.count()).scalar() == 0
+        finally:
+            metadata.drop_all(testbase.db)
         
     def testmappingtoouterjoin(self):
         """test mapping to an outer join, with a composite primary key that allows nulls"""
index 6ba3f8c4b95fe1c3238f2ee8bdf0ee39922ef8a5..42bc4d1ad896601b2764d680c2356e369e29407c 100644 (file)
@@ -1389,37 +1389,6 @@ class ManyToManyTest(UnitOfWorkTest):
         ctx.current.clear()
         l = Query(m).select(items.c.item_name.in_(*[e['item_name'] for e in data[1:]]), order_by=[items.c.item_name])
         self.assert_result(l, *data)
-
-    def testm2mmultitable(self):
-        # many-to-many join on an association table
-        j = join(users, userkeywords, 
-                users.c.user_id==userkeywords.c.user_id).join(keywords, 
-                   userkeywords.c.keyword_id==keywords.c.keyword_id)
-        print "PK", j.primary_key
-        # a class 
-        class KeywordUser(object):
-            pass
-
-        # map to it - the identity of a KeywordUser object will be
-        # (user_id, keyword_id) since those are the primary keys involved
-        m = mapper(KeywordUser, j, properties={
-            'user_id':[users.c.user_id, userkeywords.c.user_id],
-            'keyword_id':[userkeywords.c.keyword_id, keywords.c.keyword_id],
-            'keyword_name':keywords.c.name,
-        }, )
-
-        k = KeywordUser()
-        k.user_name = 'keyworduser'
-        k.keyword_name = 'a keyword'
-        ctx.current.flush()
-        print m.instance_key(k)
-        
-        id = (k.user_id, k.keyword_id)
-        ctx.current.clear()
-        k = ctx.current.query(KeywordUser).get(id)
-        assert k.user_name == 'keyworduser'
-        assert k.keyword_name == 'a keyword'
-
     
 class SaveTest2(UnitOfWorkTest):