would populate the related object's "subclass" table with
data from the "subclass" table of the parent.
[ticket:1485]
+
+ - Fixed a needless select which would occur when merging
+ transient objects that contained a null primary key
+ identifier. [ticket:1618]
- relations() now have greater ability to be "overridden",
meaning a subclass that explicitly specifies a relation()
_mapper_registry = weakref.WeakKeyDictionary()
_new_mappers = False
_already_compiling = False
+_none_set = frozenset([None])
# a list of MapperExtensions that will be installed in all mappers by default
global_extensions = []
self._log_debug("_instance(): identity key %s not in session" % (identitykey,))
if self.allow_null_pks:
- for x in identitykey[1]:
- if x is not None:
- break
- else:
+ if _none_set.issuperset(identitykey[1]):
return None
else:
if None in identitykey[1]:
from sqlalchemy.orm.util import (
_class_to_mapper, _state_has_identity, _state_mapper,
)
-from sqlalchemy.orm.mapper import Mapper
+from sqlalchemy.orm.mapper import Mapper, _none_set
from sqlalchemy.orm.unitofwork import UOWTransaction
from sqlalchemy.orm import identity
new_instance = False
state = attributes.instance_state(instance)
key = state.key
-
+
if key is None:
if dont_load:
raise sa_exc.InvalidRequestError(
"dont_load=True.")
key = mapper._identity_key_from_state(state)
- merged = None
- if key:
- if key in self.identity_map:
- merged = self.identity_map[key]
- elif dont_load:
- if state.modified:
- raise sa_exc.InvalidRequestError(
- "merge() with dont_load=True option does not support "
- "objects marked as 'dirty'. flush() all changes on "
- "mapped instances before merging with dont_load=True.")
- merged = mapper.class_manager.new_instance()
- merged_state = attributes.instance_state(merged)
- merged_state.key = key
- self._update_impl(merged_state)
- new_instance = True
- else:
- merged = self.query(mapper.class_).get(key[1])
-
+ if key in self.identity_map:
+ merged = self.identity_map[key]
+ elif dont_load:
+ if state.modified:
+ raise sa_exc.InvalidRequestError(
+ "merge() with dont_load=True option does not support "
+ "objects marked as 'dirty'. flush() all changes on "
+ "mapped instances before merging with dont_load=True.")
+ merged = mapper.class_manager.new_instance()
+ merged_state = attributes.instance_state(merged)
+ merged_state.key = key
+ self._update_impl(merged_state)
+ new_instance = True
+ elif not _none_set.issuperset(key[1]):
+ merged = self.query(mapper.class_).get(key[1])
+ else:
+ merged = None
+
if merged is None:
merged = mapper.class_manager.new_instance()
merged_state = attributes.instance_state(merged)
sess.expunge_all()
eq_(sess.query(User).first(), User(id=7, name='fred'))
+ @testing.resolve_artifact_names
+ def test_transient_to_pending_no_pk(self):
+ """test that a transient object with no PK attribute doesn't trigger a needless load."""
+ mapper(User, users)
+ sess = create_session()
+ u = User(name='fred')
+ def go():
+ sess.merge(u)
+ self.assert_sql_count(testing.db, go, 0)
+
@testing.resolve_artifact_names
def test_transient_to_pending_collection(self):
mapper(User, users, properties={