no PK columns, would not detect that the joined table had no PK.
different m2m tables. this raises an error in 0.3 but is
possible in 0.4 when aliases are used. [ticket:687]
- fixed small exception throw bug in Session.merge()
+ - fixed bug where mapper, being linked to a join where one table had
+ no PK columns, would not detect that the joined table had no PK.
- engine
- fixed another occasional race condition which could occur
when using pool with threadlocal setting
def _has_pks(self, table):
try:
- for k in self.pks_by_table[table]:
+ pk = self.pks_by_table[table]
+ if not pk:
+ return False
+ for k in pk:
if not self.columntoproperty.has_key(k):
return False
else:
q = create_session().query(m)
l = q.select()
self.assert_result(l, User, *user_result[0:2])
+
+ def testmappingtojoinnopk(self):
+ metadata = MetaData()
+ account_ids_table = Table('account_ids', metadata,
+ Column('account_id', Integer, primary_key=True),
+ Column('username', String(20)))
+ account_stuff_table = Table('account_stuff', metadata,
+ Column('account_id', Integer, ForeignKey('account_ids.account_id')),
+ Column('credit', Numeric))
+ class A(object):pass
+ m = mapper(A, account_ids_table.join(account_stuff_table))
+ m.compile()
+ assert m._has_pks(account_ids_table)
+ assert not m._has_pks(account_stuff_table)
+ metadata.create_all(testbase.db)
+ try:
+ sess = create_session(bind=testbase.db)
+ a = A()
+ sess.save(a)
+ sess.flush()
+ assert testbase.db.execute(account_ids_table.count()).scalar() == 1
+ assert testbase.db.execute(account_stuff_table.count()).scalar() == 0
+ finally:
+ metadata.drop_all(testbase.db)
def testmappingtoouterjoin(self):
"""test mapping to an outer join, with a composite primary key that allows nulls"""
ctx.current.clear()
l = Query(m).select(items.c.item_name.in_(*[e['item_name'] for e in data[1:]]), order_by=[items.c.item_name])
self.assert_result(l, *data)
-
- def testm2mmultitable(self):
- # many-to-many join on an association table
- j = join(users, userkeywords,
- users.c.user_id==userkeywords.c.user_id).join(keywords,
- userkeywords.c.keyword_id==keywords.c.keyword_id)
- print "PK", j.primary_key
- # a class
- class KeywordUser(object):
- pass
-
- # map to it - the identity of a KeywordUser object will be
- # (user_id, keyword_id) since those are the primary keys involved
- m = mapper(KeywordUser, j, properties={
- 'user_id':[users.c.user_id, userkeywords.c.user_id],
- 'keyword_id':[userkeywords.c.keyword_id, keywords.c.keyword_id],
- 'keyword_name':keywords.c.name,
- }, )
-
- k = KeywordUser()
- k.user_name = 'keyworduser'
- k.keyword_name = 'a keyword'
- ctx.current.flush()
- print m.instance_key(k)
-
- id = (k.user_id, k.keyword_id)
- ctx.current.clear()
- k = ctx.current.query(KeywordUser).get(id)
- assert k.user_name == 'keyworduser'
- assert k.keyword_name == 'a keyword'
-
class SaveTest2(UnitOfWorkTest):