"all changes on mapped instances before merging with "
"load=False.")
key = mapper._identity_key_from_state(state)
+ key_is_persistent = attributes.NEVER_SET not in key[1]
+ else:
+ key_is_persistent = True
if key in self.identity_map:
merged = self.identity_map[key]
self._update_impl(merged_state)
new_instance = True
- elif not _none_set.intersection(key[1]) or \
+ elif key_is_persistent and (
+ not _none_set.intersection(key[1]) or
(mapper.allow_partial_pks and
- not _none_set.issuperset(key[1])):
+ not _none_set.issuperset(key[1]))):
merged = self.query(mapper.class_).get(key[1])
else:
merged = None
from sqlalchemy.util import OrderedSet
from sqlalchemy.orm import mapper, relationship, create_session, \
PropComparator, synonym, comparable_property, sessionmaker, \
- attributes, Session, backref, configure_mappers
+ attributes, Session, backref, configure_mappers, foreign
from sqlalchemy.orm.collections import attribute_mapped_collection
from sqlalchemy.orm.interfaces import MapperOption
from sqlalchemy.testing import eq_, ne_
eq_(u2.addresses[1].email_address, 'afafds')
eq_(load.called, 21)
+ def test_dont_send_neverset_to_get(self):
+ # test issue #3647
+ CompositePk, composite_pk_table = (
+ self.classes.CompositePk, self.tables.composite_pk_table
+ )
+ mapper(CompositePk, composite_pk_table)
+ cp1 = CompositePk(j=1, k=1)
+
+ sess = Session()
+
+ rec = []
+
+ def go():
+ rec.append(sess.merge(cp1))
+ self.assert_sql_count(testing.db, go, 0)
+ rec[0].i = 5
+ sess.commit()
+ eq_(rec[0].i, 5)
+
+ def test_dont_send_neverset_to_get_w_relationship(self):
+ # test issue #3647
+ CompositePk, composite_pk_table = (
+ self.classes.CompositePk, self.tables.composite_pk_table
+ )
+ User, users = (
+ self.classes.User, self.tables.users
+ )
+ mapper(User, users, properties={
+ 'elements': relationship(
+ CompositePk,
+ primaryjoin=users.c.id == foreign(composite_pk_table.c.i))
+ })
+ mapper(CompositePk, composite_pk_table)
+
+ u1 = User(id=5, name='some user')
+ cp1 = CompositePk(j=1, k=1)
+ u1.elements.append(cp1)
+ sess = Session()
+
+ rec = []
+
+ def go():
+ rec.append(sess.merge(u1))
+ self.assert_sql_count(testing.db, go, 1)
+ u2 = rec[0]
+ sess.commit()
+ eq_(u2.elements[0].i, 5)
+ eq_(u2.id, 5)
+
def test_no_relationship_cascade(self):
"""test that merge doesn't interfere with a relationship()
target that specifically doesn't include 'merge' cascade.