"0.4.7".
- orm
+ - The "entity_name" feature of SQLAlchemy mappers
+ has been removed. For rationale, see
+ http://tinyurl.com/6nm2ne
+
- The 'cascade' parameter to relation() accepts None
as a value, which is equivalent to no cascades.
#### Multiple Mappers for One Class {@name=multiple}
-The first mapper created for a certain class is known as that class's "primary mapper." Other mappers can be created as well, these come in two varieties.
-
-* **secondary mapper**
- this is a mapper that must be constructed with the keyword argument `non_primary=True`, and represents a load-only mapper. Objects that are loaded with a secondary mapper will have their save operation processed by the primary mapper. It is also invalid to add new `relation()`s to a non-primary mapper. To use this mapper with the Session, specify it to the `query` method:
+The first mapper created for a certain class is known as that class's "primary mapper." Other mappers can be created as well on the "load side" - these are called **secondary mappers**. This is a mapper that must be constructed with the keyword argument `non_primary=True`, and represents a load-only mapper. Objects that are loaded with a secondary mapper will have their save operation processed by the primary mapper. It is also invalid to add new `relation()`s to a non-primary mapper. To use this mapper with the Session, specify it to the `query` method:
example:
# select
result = session.query(othermapper).select()
- The "non primary mapper" is a rarely needed feature of SQLAlchemy; in most cases, the `Query` object can produce any kind of query that's desired. It's recommended that a straight `Query` be used in place of a non-primary mapper unless the mapper approach is absolutely needed. Current use cases for the "non primary mapper" are when you want to map the class to a particular select statement or view to which additional query criterion can be added, and for when the particular mapped select statement or view is to be placed in a `relation()` of a parent mapper.
-
-* **entity name mapper**
- this is a mapper that is a fully functioning primary mapper for a class, which is distinguished from the regular primary mapper by an `entity_name` parameter. Instances loaded with this mapper will be totally managed by this new mapper and have no connection to the original one. Most methods on `Session` include an optional `entity_name` parameter in order to specify this condition.
-
- example:
-
- {python}
- # primary mapper
- mapper(User, users_table)
-
- # make an entity name mapper that stores User objects in another table
- mapper(User, alternate_users_table, entity_name='alt')
-
- # make two User objects
- user1 = User()
- user2 = User()
-
- # save one in in the "users" table
- session.save(user1)
-
- # save the other in the "alternate_users_table"
- session.save(user2, entity_name='alt')
-
- session.flush()
-
- # select from the alternate mapper
- session.query(User, entity_name='alt').select()
+The "non primary mapper" is a rarely needed feature of SQLAlchemy; in most cases, the `Query` object can produce any kind of query that's desired. It's recommended that a straight `Query` be used in place of a non-primary mapper unless the mapper approach is absolutely needed. Current use cases for the "non primary mapper" are when you want to map the class to a particular select statement or view to which additional query criterion can be added, and for when the particular mapped select statement or view is to be placed in a `relation()` of a parent mapper.
- Use the "entity name" mapper when different instances of the same class are persisted in completely different tables. The "entity name" approach can also perform limited levels of horizontal partitioning as well. A more comprehensive approach to horizontal partitioning is provided by the Sharding API.
+Versions of SQLAlchemy previous to 0.5 included another mapper flag called "entity_name", as of version 0.5.0 this feature has been removed (it never worked very well).
#### Extending Mapper {@name=extending}
### Querying
-The `query()` function takes one or more *entities* and returns a new `Query` object which will issue mapper queries within the context of this Session. An entity is defined as a mapped class, a `Mapper` object, an orm-enabled *descriptor*, or an `AliasedClass` object (a future release will also include an `Entity` object for use with entity_name mappers).
+The `query()` function takes one or more *entities* and returns a new `Query` object which will issue mapper queries within the context of this Session. An entity is defined as a mapped class, a `Mapper` object, an orm-enabled *descriptor*, or an `AliasedClass` object.
{python}
# query from a class
"""
return PropertyLoader(argument, secondary=secondary, **kwargs)
-def dynamic_loader(argument, secondary=None, primaryjoin=None, secondaryjoin=None, entity_name=None,
+def dynamic_loader(argument, secondary=None, primaryjoin=None, secondaryjoin=None,
foreign_keys=None, backref=None, post_update=False, cascade=False, remote_side=None, enable_typechecks=True,
passive_deletes=False, order_by=None):
"""Construct a dynamically-loading mapper property.
from sqlalchemy.orm.dynamic import DynaLoader
return PropertyLoader(argument, secondary=secondary, primaryjoin=primaryjoin,
- secondaryjoin=secondaryjoin, entity_name=entity_name, foreign_keys=foreign_keys, backref=backref,
+ secondaryjoin=secondaryjoin, foreign_keys=foreign_keys, backref=backref,
post_update=post_update, cascade=cascade, remote_side=remote_side, enable_typechecks=enable_typechecks,
passive_deletes=passive_deletes, order_by=order_by,
strategy_class=DynaLoader)
The table to which the class is mapped, or None if this mapper
inherits from another mapper using concrete table inheritance.
- entity_name
- A name to be associated with the `class`, to allow alternate mappings
- for a single class.
-
always_refresh
If True, all query operations for this mapped class will overwrite all
data within object instances that already exist within the session,
ATTR_WAS_SET = util.symbol('ATTR_WAS_SET')
NO_VALUE = util.symbol('NO_VALUE')
NEVER_SET = util.symbol('NEVER_SET')
-NO_ENTITY_NAME = util.symbol('NO_ENTITY_NAME')
INSTRUMENTATION_MANAGER = '__sa_instrumentation_manager__'
"""Attribute, elects custom instrumentation when present on a mapped class.
session_id = None
key = None
runid = None
- entity_name = NO_ENTITY_NAME
expired_attributes = EMPTY_SET
-
+
def __init__(self, obj, manager):
self.class_ = obj.__class__
self.manager = manager
impl = self.get_impl(key)
x = impl.get(self, passive=passive)
if x is PASSIVE_NORESULT:
+
return None
elif hasattr(impl, 'get_collection'):
return impl.get_collection(self, x, passive=passive)
def __getstate__(self):
return {'key': self.key,
- 'entity_name': self.entity_name,
'committed_state': self.committed_state,
'pending': self.pending,
'parents': self.parents,
self.parents = state['parents']
self.key = state['key']
self.session_id = None
- self.entity_name = state['entity_name']
self.pending = state['pending']
self.modified = state['modified']
self.obj = weakref.ref(state['instance'])
self.class_ = class_
self.factory = None # where we came from, for inheritance bookkeeping
self.info = {}
- self.mappers = {}
+ self.mapper = None
self.mutable_attributes = set()
self.local_attrs = {}
self.originals = {}
manager.unregister()
def register_attribute(class_, key, uselist, useobject, callable_=None, proxy_property=None, mutable_scalars=False, impl_class=None, **kwargs):
-
manager = manager_of_class(class_)
if manager.is_instrumented(key):
- # this currently only occurs if two primary mappers are made for the
- # same class. TODO: possibly have InstrumentedAttribute check
- # "entity_name" when searching for impl. raise an error if two
- # attrs attached simultaneously otherwise
return
if uselist:
impl_class=impl_class,
**kwargs),
comparator=comparator, parententity=parententity)
-
+
manager.instrument_attribute(key, descriptor)
def unregister_attribute(class_, key):
for s in [elem for elem in uowcommit.session.identity_map.all_states()
if issubclass(elem.class_, self.parent.class_) and
self.key in elem.dict and
+ elem.dict[self.key] is not None and
attributes.instance_state(elem.dict[self.key]) in switchers
]:
uowcommit.register_object(s, listonly=self.passive_updates)
sync.populate(attributes.instance_state(s.dict[self.key]), self.mapper, s, self.parent, self.prop.synchronize_pairs)
- #self.syncrules.execute(s.dict[self.key]._state, s, None, None, False)
def _pks_changed(self, uowcommit, state):
return sync.source_changes(uowcommit, state, self.mapper, self.prop.synchronize_pairs)
class UnmappedInstanceError(UnmappedError):
"""An mapping operation was requested for an unknown instance."""
- def __init__(self, obj, entity_name=None, msg=None):
+ def __init__(self, obj, msg=None):
if not msg:
try:
- mapper = sa.orm.class_mapper(type(obj), entity_name)
+ mapper = sa.orm.class_mapper(type(obj))
name = _safe_cls_name(type(obj))
msg = ("Class %r is mapped, but this instance lacks "
"instrumentation. Possible causes: instance created "
"instance was pickled/depickled without instrumentation"
"information." % (name, name))
except UnmappedClassError:
- msg = _default_unmapped(type(obj), entity_name)
+ msg = _default_unmapped(type(obj))
if isinstance(obj, type):
msg += (
'; was a class (%s) supplied where an instance was '
class UnmappedClassError(UnmappedError):
"""An mapping operation was requested for an unknown class."""
- def __init__(self, cls, entity_name=None, msg=None):
+ def __init__(self, cls, msg=None):
if not msg:
- msg = _default_unmapped(cls, entity_name)
+ msg = _default_unmapped(cls)
UnmappedError.__init__(self, msg)
cls_name = repr(cls)
return cls_name
-def _default_unmapped(cls, entity_name):
+def _default_unmapped(cls):
try:
mappers = sa.orm.attributes.manager_of_class(cls).mappers
except NO_STATE:
mappers = {}
name = _safe_cls_name(cls)
- if not mappers and entity_name is None:
+ if not mappers:
return "Class '%s' is not mapped" % name
- else:
- return "Class '%s' is not mapped with entity_name %r" % (
- name, entity_name)
return None
return [
- (mapper.class_, mapper.entity_name, key)
+ (mapper.class_, key)
for mapper, key in [(path[i], path[i+1]) for i in range(0, len(path)-1, 2)]
]
from sqlalchemy.orm import class_mapper
return tuple(
- chain(*[(class_mapper(cls, entity), key) for cls, entity, key in path])
+ chain(*[(class_mapper(cls), key) for cls, key in path])
)
class MapperOption(object):
_expire_state = None
_state_session = None
-
class Mapper(object):
"""Define the correlation of class attributes to database table
columns.
inherit_foreign_keys = None,
extension = None,
order_by = False,
- entity_name = None,
always_refresh = False,
version_id_col = None,
polymorphic_on=None,
self.class_ = class_
self.class_manager = None
- self.entity_name = entity_name
+
self.primary_key_argument = primary_key
self.non_primary = non_primary
def __log(self, msg):
if self.__should_log_info:
- self.logger.info("(" + self.class_.__name__ + "|" + (self.entity_name is not None and "/%s" % self.entity_name or "") + (self.local_table and self.local_table.description or str(self.local_table)) + (self.non_primary and "|non-primary" or "") + ") " + msg)
+ self.logger.info("(" + self.class_.__name__ + "|" + (self.local_table and self.local_table.description or str(self.local_table)) + (self.non_primary and "|non-primary" or "") + ") " + msg)
def __log_debug(self, msg):
if self.__should_log_debug:
- self.logger.debug("(" + self.class_.__name__ + "|" + (self.entity_name is not None and "/%s" % self.entity_name or "") + (self.local_table and self.local_table.description or str(self.local_table)) + (self.non_primary and "|non-primary" or "") + ") " + msg)
+ self.logger.debug("(" + self.class_.__name__ + "|" + (self.local_table and self.local_table.description or str(self.local_table)) + (self.non_primary and "|non-primary" or "") + ") " + msg)
def _is_orphan(self, state):
o = False
self.compiled = True
manager = self.class_manager
- mappers = manager.mappers
- if not self.non_primary and self.entity_name in mappers:
- del mappers[self.entity_name]
- if not mappers and manager.info.get(_INSTRUMENTOR, False):
+ if not self.non_primary and manager.mapper is self:
+ manager.mapper = None
manager.events.remove_listener('on_init', _event_on_init)
manager.events.remove_listener('on_init_failure',
_event_on_init_failure)
manager = attributes.manager_of_class(self.class_)
if self.non_primary:
- if not manager or None not in manager.mappers:
+ if not manager or manager.mapper is None:
raise sa_exc.InvalidRequestError(
"Class %s has no primary mapper configured. Configure "
"a primary mapper first before setting up a non primary "
if manager is not None:
if manager.class_ is not self.class_:
# An inherited manager. Install one for this subclass.
+ raise "eh?"
manager = None
- elif self.entity_name in manager.mappers:
+ elif manager.mapper:
raise sa_exc.ArgumentError(
- "Class '%s' already has a primary mapper defined "
- "with entity name '%s'. Use non_primary=True to "
+ "Class '%s' already has a primary mapper defined. "
+ "Use non_primary=True to "
"create a non primary Mapper. clear_mappers() will "
"remove *all* current mappers from all classes." %
- (self.class_, self.entity_name))
+ self.class_)
_mapper_registry[self] = True
self.class_manager = manager
- has_been_initialized = bool(manager.info.get(_INSTRUMENTOR, False))
- manager.mappers[self.entity_name] = self
+ manager.mapper = self
# The remaining members can be added by any mapper, e_name None or not.
- if has_been_initialized:
+ if manager.info.get(_INSTRUMENTOR, False):
return
self.extension.instrument_class(self, self.class_)
self.extension.on_reconstitute(self, instance)
event_registry.add_listener('on_load', reconstitute)
-
manager.info[_INSTRUMENTOR] = self
def common_parent(self, other):
id(self), self.class_.__name__)
def __str__(self):
- return "Mapper|" + self.class_.__name__ + "|" + (self.entity_name is not None and "/%s" % self.entity_name or "") + (self.local_table and self.local_table.description or str(self.local_table)) + (self.non_primary and "|non-primary" or "")
+ return "Mapper|" + self.class_.__name__ + "|" + (self.local_table and self.local_table.description or str(self.local_table)) + (self.non_primary and "|non-primary" or "")
def primary_mapper(self):
- """Return the primary mapper corresponding to this mapper's class key (class + entity_name)."""
- return self.class_manager.mappers[self.entity_name]
+ """Return the primary mapper corresponding to this mapper's class key (class)."""
+ return self.class_manager.mapper
def identity_key_from_row(self, row, adapter=None):
"""Return an identity-map key for use in storing/retrieving an
if adapter:
pk_cols = [adapter.columns[c] for c in pk_cols]
- return (self._identity_class, tuple(row[column] for column in pk_cols), self.entity_name)
+ return (self._identity_class, tuple(row[column] for column in pk_cols))
def identity_key_from_primary_key(self, primary_key):
"""Return an identity-map key for use in storing/retrieving an
primary_key
A list of values indicating the identifier.
"""
- return (self._identity_class, tuple(util.to_list(primary_key)), self.entity_name)
+ return (self._identity_class, tuple(util.to_list(primary_key)))
def identity_key_from_instance(self, instance):
"""Return the identity key for the given instance, based on
if version_id_col:
version_id_col = adapter.columns[version_id_col]
- identity_class, entity_name = self._identity_class, self.entity_name
+ identity_class = self._identity_class
def identity_key(row):
- return (identity_class, tuple(row[column] for column in pk_cols), entity_name)
+ return (identity_class, tuple(row[column] for column in pk_cols))
new_populators = []
existing_populators = []
self.__log_debug("_instance(): created new instance %s identity %s" % (instance_str(instance), str(identitykey)))
state = attributes.instance_state(instance)
- state.entity_name = self.entity_name
state.key = identitykey
+
# manually adding instance to session. for a complete add,
# session._finalize_loaded() must be called.
state.session_id = context.session.hash_key
def __init__(self, argument,
secondary=None, primaryjoin=None,
- secondaryjoin=None, entity_name=None,
+ secondaryjoin=None,
foreign_keys=None,
uselist=None,
order_by=False,
self.uselist = uselist
self.argument = argument
- self.entity_name = entity_name
self.secondary = secondary
self.primaryjoin = primaryjoin
self.secondaryjoin = secondaryjoin
dest_list = []
for current in instances:
_recursive[(current, self)] = True
- obj = session.merge(current, entity_name=self.mapper.entity_name, dont_load=dont_load, _recursive=_recursive)
+ obj = session.merge(current, dont_load=dont_load, _recursive=_recursive)
if obj is not None:
dest_list.append(obj)
if dont_load:
current = instances[0]
if current is not None:
_recursive[(current, self)] = True
- obj = session.merge(current, entity_name=self.mapper.entity_name, dont_load=dont_load, _recursive=_recursive)
+ obj = session.merge(current, dont_load=dont_load, _recursive=_recursive)
if obj is not None:
if dont_load:
dest.__dict__[self.key] = obj
visited_instances.add(c)
# cascade using the mapper local to this object, so that its individual properties are located
- instance_mapper = object_mapper(c, entity_name=mapper.entity_name)
+ instance_mapper = object_mapper(c)
yield (c, instance_mapper, attributes.instance_state(c))
def _get_target_class(self):
def _determine_targets(self):
if isinstance(self.argument, type):
- self.mapper = mapper.class_mapper(self.argument, entity_name=self.entity_name, compile=False)
+ self.mapper = mapper.class_mapper(self.argument, compile=False)
elif isinstance(self.argument, mapper.Mapper):
self.mapper = self.argument
elif callable(self.argument):
# accept a callable to suit various deferred-configurational schemes
- self.mapper = mapper.class_mapper(self.argument(), entity_name=self.entity_name, compile=False)
+ self.mapper = mapper.class_mapper(self.argument(), compile=False)
else:
raise sa_exc.ArgumentError("relation '%s' expects a class or a mapper argument (received: %s)" % (self.key, type(self.argument)))
assert isinstance(self.mapper, mapper.Mapper), self.mapper
class Query(object):
"""Encapsulates the object-fetching operations provided by Mappers."""
- def __init__(self, entities, session=None, entity_name=None):
+ def __init__(self, entities, session=None):
self.session = session
self._with_options = []
self.__currenttables = set()
for ent in util.to_list(entities):
- _QueryEntity(self, ent, entity_name=entity_name)
+ _QueryEntity(self, ent)
self.__setup_aliasizers(self._entities)
for ent in entities:
for entity in ent.entities:
if entity not in d:
- mapper, selectable, is_aliased_class = _entity_info(entity, ent.entity_name)
+ mapper, selectable, is_aliased_class = _entity_info(entity)
if not is_aliased_class and mapper.with_polymorphic:
with_polymorphic = mapper._with_polymorphic_mappers
self.__mapper_loads_polymorphically_with(mapper, sql_util.ColumnAdapter(selectable, mapper._equivalent_columns))
target_cls = self._mapper_zero().class_
#TODO: detect when the where clause is a trivial primary key match
- objs_to_expunge = [obj for (cls, pk, entity_name),obj in session.identity_map.iteritems()
+ objs_to_expunge = [obj for (cls, pk),obj in session.identity_map.iteritems()
if issubclass(cls, target_cls) and eval_condition(obj)]
for obj in objs_to_expunge:
session._remove_newly_deleted(attributes.instance_state(obj))
if synchronize_session == 'evaluate':
target_cls = self._mapper_zero().class_
- for (cls, pk, entity_name),obj in session.identity_map.iteritems():
+ for (cls, pk),obj in session.identity_map.iteritems():
evaluated_keys = value_evaluators.keys()
if issubclass(cls, target_cls) and eval_condition(obj):
class _MapperEntity(_QueryEntity):
"""mapper/class/AliasedClass entity"""
- def __init__(self, query, entity, entity_name=None):
+ def __init__(self, query, entity):
self.primary_entity = not query._entities
query._entities.append(self)
self.entities = [entity]
self.entity_zero = entity
- self.entity_name = entity_name
def setup_entity(self, entity, mapper, adapter, from_obj, is_aliased_class, with_polymorphic):
self.mapper = mapper
class _ColumnEntity(_QueryEntity):
"""Column/expression based entity."""
- def __init__(self, query, column, entity_name=None):
+ def __init__(self, query, column):
if isinstance(column, expression.FromClause) and not isinstance(column, expression.ColumnElement):
for c in column.c:
_ColumnEntity(query, c)
column = column.label(None)
self.column = column
- self.entity_name = None
self.froms = set()
self.entities = util.OrderedSet(
elem._annotations['parententity']
def init_instance(self, mapper, class_, oldinit, instance, args, kwargs):
if self.save_on_init:
- entity_name = kwargs.pop('_sa_entity_name', None)
session = kwargs.pop('_sa_session', None)
if self.set_kwargs_on_init:
if self.save_on_init:
session = session or self.context.registry()
- session._save_without_cascade(instance, entity_name=entity_name)
+ session._save_without_cascade(instance)
return EXT_CONTINUE
def init_failed(self, mapper, class_, oldinit, instance, args, kwargs):
# TODO: need much more test coverage for bind_mapper() and similar !
# TODO: + crystalize + document resolution order vis. bind_mapper/bind_table
- def bind_mapper(self, mapper, bind, entity_name=None):
+ def bind_mapper(self, mapper, bind):
"""Bind operations for a mapper to a Connectable.
mapper
bind
Any Connectable: a ``Engine`` or ``Connection``.
- entity_name
- Defaults to None.
-
All subsequent operations involving this mapper will use the given
`bind`.
"""
if isinstance(mapper, type):
- mapper = _class_mapper(mapper, entity_name=entity_name)
+ mapper = _class_mapper(mapper)
self.__binds[mapper.base_mapper] = bind
for t in mapper._all_tables:
del state.session_id
@util.pending_deprecation('0.5.x', "Use session.add()")
- def save(self, instance, entity_name=None):
+ def save(self, instance):
"""Add a transient (unsaved) instance to this ``Session``.
This operation cascades the `save_or_update` method to associated
instances if the relation is mapped with ``cascade="save-update"``.
- The `entity_name` keyword argument will further qualify the specific
- ``Mapper`` used to handle this instance.
"""
- state = _state_for_unsaved_instance(instance, entity_name)
+ state = _state_for_unsaved_instance(instance)
self._save_impl(state)
- self._cascade_save_or_update(state, entity_name)
+ self._cascade_save_or_update(state)
- def _save_without_cascade(self, instance, entity_name=None):
+ def _save_without_cascade(self, instance):
"""Used by scoping.py to save on init without cascade."""
- state = _state_for_unsaved_instance(instance, entity_name, create=True)
+ state = _state_for_unsaved_instance(instance, create=True)
self._save_impl(state)
@util.pending_deprecation('0.5.x', "Use session.add()")
- def update(self, instance, entity_name=None):
+ def update(self, instance):
"""Bring a detached (saved) instance into this ``Session``.
If there is a persistent instance with the same instance key, but
try:
state = attributes.instance_state(instance)
except exc.NO_STATE:
- raise exc.UnmappedInstanceError(instance, entity_name)
+ raise exc.UnmappedInstanceError(instance)
self._update_impl(state)
- self._cascade_save_or_update(state, entity_name)
+ self._cascade_save_or_update(state)
- def add(self, instance, entity_name=None):
+ def add(self, instance):
"""Add the given instance into this ``Session``.
TODO: rephrase the below in user terms; possibly tie into future
to ``save()`` or ``update()`` the instance.
"""
- state = _state_for_unknown_persistence_instance(instance, entity_name)
- self._save_or_update_state(state, entity_name)
+ state = _state_for_unknown_persistence_instance(instance)
+ self._save_or_update_state(state)
def add_all(self, instances):
"""Add the given collection of instances to this ``Session``."""
for instance in instances:
self.add(instance)
- def _save_or_update_state(self, state, entity_name):
+ def _save_or_update_state(self, state):
self._save_or_update_impl(state)
- self._cascade_save_or_update(state, entity_name)
+ self._cascade_save_or_update(state)
save_or_update = (
util.pending_deprecation('0.5.x', "Use session.add()")(add))
- def _cascade_save_or_update(self, state, entity_name):
+ def _cascade_save_or_update(self, state):
for state, mapper in _cascade_unknown_state_iterator('save-update', state, halt_on=lambda c:c in self):
self._save_or_update_impl(state)
for state, m, o in cascade_states:
self._delete_impl(state, ignore_transient=True)
- def merge(self, instance, entity_name=None, dont_load=False,
+ def merge(self, instance, dont_load=False,
_recursive=None):
"""Copy the state an instance onto the persistent instance with the same identifier.
# TODO: this should be an IdentityDict for instances, but will
# need a separate dict for PropertyLoader tuples
_recursive = {}
- if entity_name is not None:
- mapper = _class_mapper(instance.__class__, entity_name=entity_name)
- else:
- mapper = _object_mapper(instance)
+ mapper = _object_mapper(instance)
if instance in _recursive:
return _recursive[instance]
merged = mapper.class_manager.new_instance()
merged_state = attributes.instance_state(merged)
merged_state.key = key
- merged_state.entity_name = entity_name
self._update_impl(merged_state)
new_instance = True
else:
merged = mapper.class_manager.new_instance()
merged_state = attributes.instance_state(merged)
new_instance = True
- self.save(merged, entity_name=mapper.entity_name)
+ self.save(merged)
_recursive[instance] = merged
def _cascade_unknown_state_iterator(cascade, state, **kwargs):
mapper = _state_mapper(state)
for (o, m) in mapper.cascade_iterator(cascade, state, **kwargs):
- yield _state_for_unknown_persistence_instance(o, m.entity_name), m
+ yield _state_for_unknown_persistence_instance(o), m
-def _state_for_unsaved_instance(instance, entity_name, create=False):
+def _state_for_unsaved_instance(instance, create=False):
manager = attributes.manager_of_class(instance.__class__)
if manager is None:
- raise exc.UnmappedInstanceError(instance, entity_name)
+ raise exc.UnmappedInstanceError(instance)
if manager.has_state(instance):
state = manager.state_of(instance)
if state.key is not None:
elif create:
state = manager.setup_instance(instance)
else:
- raise exc.UnmappedInstanceError(instance, entity_name)
- state.entity_name = entity_name
+ raise exc.UnmappedInstanceError(instance)
+
return state
-def _state_for_unknown_persistence_instance(instance, entity_name):
+def _state_for_unknown_persistence_instance(instance):
try:
state = attributes.instance_state(instance)
except exc.NO_STATE:
- raise exc.UnmappedInstanceError(instance, entity_name)
- state.entity_name = entity_name
+ raise exc.UnmappedInstanceError(instance)
+
return state
def object_session(instance):
self.parent_property._get_strategy(ColumnLoader).setup_query(context, entity, path, adapter, **kwargs)
def class_level_loader(self, state, props=None):
- if not mapperutil._state_has_mapper(state):
+ if not mapperutil._state_has_identity(state):
return None
localparent = mapper._state_mapper(state)
# adjust for the ColumnProperty associated with the instance
- # not being our own ColumnProperty. This can occur when entity_name
- # mappers are used to map different versions of the same ColumnProperty
- # to the class.
+ # not being our own ColumnProperty.
+ # TODO: this may no longer be relevant without entity_name.
prop = localparent.get_property(self.key)
if prop is not self.parent_property:
return prop._get_strategy(DeferredColumnLoader).setup_loader(state)
return visitors.cloned_traverse(criterion, {}, {'binary':visit_binary})
def class_level_loader(self, state, options=None, path=None):
- if not mapperutil._state_has_mapper(state):
+ if not mapperutil._state_has_identity(state):
return None
localparent = mapper._state_mapper(state)
# adjust for the PropertyLoader associated with the instance
- # not being our own PropertyLoader. This can occur when entity_name
- # mappers are used to map different versions of the same PropertyLoader
- # to the class.
+ # not being our own PropertyLoader.
+ # TODO: this may no longer be relevant without entity_name
prop = localparent.get_property(self.key)
if prop is not self.parent_property:
return prop._get_strategy(LazyLoader).setup_loader(state)
if sess:
prop = _state_mapper(state).get_property(self.key)
if prop.cascade.save_update and item not in sess:
- sess.save_or_update(item, entity_name=prop.mapper.entity_name)
+ sess.save_or_update(item)
def remove(self, state, item, initiator):
sess = _state_session(state)
if sess:
prop = _state_mapper(state).get_property(self.key)
if newvalue is not None and prop.cascade.save_update and newvalue not in sess:
- sess.save_or_update(newvalue, entity_name=prop.mapper.entity_name)
+ sess.save_or_update(newvalue)
if prop.cascade.delete_orphan and oldvalue in sess.new:
sess.expunge(oldvalue)
def _repr_task(self, task):
if task.mapper is not None:
if task.mapper.__class__.__name__ == 'Mapper':
- name = task.mapper.class_.__name__ + "/" + task.mapper.local_table.description + "/" + str(task.mapper.entity_name)
+ name = task.mapper.class_.__name__ + "/" + task.mapper.local_table.description
else:
name = repr(task.mapper)
else:
Valid call signatures:
- * ``identity_key(class, ident, entity_name=None)``
+ * ``identity_key(class, ident)``
class
mapped class (must be a positional argument)
ident
primary key, if the key is composite this is a tuple
- entity_name
- optional entity name
* ``identity_key(instance=instance)``
instance
object instance (must be given as a keyword arg)
- * ``identity_key(class, row=row, entity_name=None)``
+ * ``identity_key(class, row=row)``
class
mapped class (must be a positional argument)
row
result proxy row (must be given as a keyword arg)
- entity_name
- optional entity name (must be given as a keyword arg)
"""
if args:
if len(args) == 1:
row = kwargs.pop("row")
except KeyError:
ident = kwargs.pop("ident")
- entity_name = kwargs.pop("entity_name", None)
elif len(args) == 2:
class_, ident = args
- entity_name = kwargs.pop("entity_name", None)
elif len(args) == 3:
- class_, ident, entity_name = args
+ class_, ident = args
else:
raise sa_exc.ArgumentError("expected up to three "
"positional arguments, got %s" % len(args))
if kwargs:
raise sa_exc.ArgumentError("unknown keyword arguments: %s"
% ", ".join(kwargs.keys()))
- mapper = class_mapper(class_, entity_name=entity_name)
+ mapper = class_mapper(class_)
if "ident" in locals():
return mapper.identity_key_from_primary_key(ident)
return mapper.identity_key_from_row(row)
return prop.compare(operators.eq, instance, value_is_parent=True)
-def _entity_info(entity, entity_name=None, compile=True):
+def _entity_info(entity, compile=True):
if isinstance(entity, AliasedClass):
return entity._AliasedClass__mapper, entity._AliasedClass__alias, True
elif _is_mapped_class(entity):
if isinstance(entity, type):
- mapper = class_mapper(entity, entity_name, compile)
+ mapper = class_mapper(entity, compile)
else:
if compile:
mapper = entity.compile()
def _is_aliased_class(entity):
return isinstance(entity, AliasedClass)
-def _state_mapper(state, entity_name=None):
- if state.entity_name is not attributes.NO_ENTITY_NAME:
- # Override the given entity name if the object is not transient.
- entity_name = state.entity_name
- return state.manager.mappers[entity_name]
+def _state_mapper(state):
+ return state.manager.mapper
-def object_mapper(object, entity_name=None, raiseerror=True):
+def object_mapper(object, raiseerror=True):
"""Given an object, return the primary Mapper associated with the object instance.
object
The object instance.
- entity_name
- Entity name of the mapper to retrieve, if the given instance is
- transient. Otherwise uses the entity name already associated
- with the instance.
-
raiseerror
Defaults to True: raise an ``InvalidRequestError`` if no mapper can
be located. If False, return None.
except exc.NO_STATE:
if not raiseerror:
return None
- raise exc.UnmappedInstanceError(object, entity_name)
- if state.entity_name is not attributes.NO_ENTITY_NAME:
- # Override the given entity name if the object is not transient.
- entity_name = state.entity_name
+ raise exc.UnmappedInstanceError(object)
return class_mapper(
- type(object), entity_name=entity_name,
- compile=False, raiseerror=raiseerror)
+ type(object), compile=False, raiseerror=raiseerror)
-def class_mapper(class_, entity_name=None, compile=True, raiseerror=True):
- """Given a class (or an object) and optional entity_name, return the primary Mapper associated with the key.
+def class_mapper(class_, compile=True, raiseerror=True):
+ """Given a class (or an object), return the primary Mapper associated with the key.
If no mapper can be located, raises ``InvalidRequestError``.
class_ = type(class_)
try:
class_manager = attributes.manager_of_class(class_)
- mapper = class_manager.mappers[entity_name]
+ mapper = class_manager.mapper
except exc.NO_STATE:
if not raiseerror:
return
- raise exc.UnmappedClassError(class_, entity_name)
- raise sa_exc.InvalidRequestError(
- "Class '%s' entity name '%s' has no mapper associated with it" %
- (class_.__name__, entity_name))
+ raise exc.UnmappedClassError(class_)
if compile:
mapper = mapper.compile()
return mapper
-def _class_to_mapper(class_or_mapper, entity_name=None, compile=True):
+def _class_to_mapper(class_or_mapper, compile=True):
if _is_aliased_class(class_or_mapper):
return class_or_mapper._AliasedClass__mapper
elif isinstance(class_or_mapper, type):
- return class_mapper(class_or_mapper, entity_name=entity_name, compile=compile)
+ return class_mapper(class_or_mapper, compile=compile)
elif hasattr(class_or_mapper, 'compile'):
if compile:
return class_or_mapper.compile()
else:
return class_or_mapper
else:
- raise exc.UnmappedClassError(class_or_mapper, entity_name)
+ raise exc.UnmappedClassError(class_or_mapper)
def has_identity(object):
state = attributes.instance_state(object)
def _state_has_identity(state):
return bool(state.key)
-def has_mapper(object):
- state = attributes.instance_state(object)
- return _state_has_mapper(state)
-
-def _state_has_mapper(state):
- return state.entity_name is not attributes.NO_ENTITY_NAME
-
def _is_mapped_class(cls):
from sqlalchemy.orm import mapperlib as mapper
if isinstance(cls, (AliasedClass, mapper.Mapper)):
'orm.cycles',
- 'orm.entity',
'orm.compile',
'orm.manytomany',
'orm.onetoone',
except orm_exc.FlushError, e:
assert "is an orphan" in str(e)
+
@testing.resolve_artifact_names
def test_delete(self):
sess = create_session()
assert users.count().scalar() == 1
assert orders.count().scalar() == 0
+
+class O2MBackrefTest(_fixtures.FixtureTest):
+ run_inserts = None
+
+ @testing.resolve_artifact_names
+ def setup_mappers(self):
+ mapper(User, users, properties = dict(
+ orders = relation(
+ mapper(Order, orders), cascade="all, delete-orphan", backref="user")
+ ))
+
+ @testing.resolve_artifact_names
+ def test_lazyload_bug(self):
+ sess = create_session()
+
+ u = User(name="jack")
+ sess.add(u)
+ sess.expunge(u)
+
+ o1 = Order(description='someorder')
+ o1.user = u
+ sess.add(u)
+ assert u in sess
+ assert o1 in sess
+
+
class NoSaveCascadeTest(_fixtures.FixtureTest):
"""test that backrefs don't force save-update cascades to occur
when the cascade initiated from the forwards side."""
closedorders = sa.alias(orders, 'closedorders')
mapper(Address, addresses)
-
+ mapper(Order, orders)
+
+ open_mapper = mapper(Order, openorders, non_primary=True)
+ closed_mapper = mapper(Order, closedorders, non_primary=True)
+
mapper(User, users, properties = dict(
addresses = relation(Address, lazy=False),
open_orders = relation(
- mapper(Order, openorders, entity_name='open'),
+ open_mapper,
primaryjoin=sa.and_(openorders.c.isopen == 1,
users.c.id==openorders.c.user_id),
lazy=False),
closed_orders = relation(
- mapper(Order, closedorders,entity_name='closed'),
+ closed_mapper,
primaryjoin=sa.and_(closedorders.c.isopen == 0,
users.c.id==closedorders.c.user_id),
lazy=False)))
+++ /dev/null
-import testenv; testenv.configure_for_tests()
-from testlib import sa, testing
-from testlib.sa import Table, Column, Integer, String, ForeignKey
-from testlib.sa.orm import mapper, relation, backref, create_session
-from testlib.testing import eq_
-from orm import _base
-
-
-class EntityTest(_base.MappedTest):
- """Mappers scoped to an entity_name"""
-
- def define_tables(self, metadata):
- Table('user1', metadata,
- Column('user_id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('name', String(60), nullable=False))
-
- Table('user2', metadata,
- Column('user_id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('name', String(60), nullable=False))
-
- Table('address1', metadata,
- Column('address_id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('user_id', Integer, ForeignKey('user1.user_id'),
- nullable=False),
- Column('email', String(100), nullable=False))
-
- Table('address2', metadata,
- Column('address_id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('user_id', Integer, ForeignKey('user2.user_id'),
- nullable=False),
- Column('email', String(100), nullable=False))
-
- def setup_classes(self):
- class User(_base.BasicEntity):
- pass
-
- class Address(_base.BasicEntity):
- pass
-
- class Address1(_base.BasicEntity):
- pass
-
- class Address2(_base.BasicEntity):
- pass
-
- @testing.resolve_artifact_names
- def test_entity_name_assignment_by_extension(self):
- """
- Tests a pair of one-to-many mapper structures, establishing that both
- parent and child objects honor the "entity_name" attribute attached to
- the object instances.
-
- """
- ctx = sa.orm.scoped_session(create_session)
-
- ma1 = mapper(Address, address1, entity_name='address1',
- extension=ctx.extension)
- ma2 = mapper(Address, address2, entity_name='address2',
- extension=ctx.extension)
-
- mapper(User, user1, entity_name='user1',
- extension=ctx.extension,
- properties=dict(addresses=relation(ma1)))
-
- mapper(User, user2, entity_name='user2',
- extension=ctx.extension,
- properties=dict(addresses=relation(ma2)))
-
- u1 = User(name='this is user 1', _sa_entity_name='user1')
- a1 = Address(email='a1@foo.com', _sa_entity_name='address1')
- u1.addresses.append(a1)
-
- u2 = User(name='this is user 2', _sa_entity_name='user2')
- a2 = Address(email='a2@foo.com', _sa_entity_name='address2')
- u2.addresses.append(a2)
-
- ctx.flush()
- eq_(user1.select().execute().fetchall(), [(u1.user_id, u1.name)])
- eq_(user2.select().execute().fetchall(), [(u2.user_id, u2.name)])
- eq_(address1.select().execute().fetchall(),
- [(a1.address_id, u1.user_id, 'a1@foo.com')])
- eq_(address2.select().execute().fetchall(),
- [(a1.address_id, u2.user_id, 'a2@foo.com')])
-
- ctx.clear()
- u1list = ctx.query(User, entity_name='user1').all()
- u2list = ctx.query(User, entity_name='user2').all()
- eq_(len(u1list), 1)
- eq_(len(u2list), 1)
- assert u1list[0] is not u2list[0]
-
- eq_(len(u1list[0].addresses), 1)
- eq_(len(u2list[0].addresses), 1)
-
- u1 = ctx.query(User, entity_name='user1').first()
- ctx.refresh(u1)
- ctx.expire(u1)
-
- @testing.resolve_artifact_names
- def test_cascade(self):
- ma1 = mapper(Address, address1, entity_name='address1')
- ma2 = mapper(Address, address2, entity_name='address2')
-
- mapper(User, user1, entity_name='user1', properties=dict(
- addresses=relation(ma1)))
- mapper(User, user2, entity_name='user2', properties=dict(
- addresses=relation(ma2)))
-
- sess = create_session()
- u1 = User(name='this is user 1')
- sess.add(u1, entity_name='user1')
-
- a1 = Address(email='a1@foo.com')
- u1.addresses.append(a1)
-
- u2 = User(name='this is user 2')
- a2 = Address(email='a2@foo.com')
- u2.addresses.append(a2)
- sess.add(u2, entity_name='user2')
-
- sess.flush()
- eq_(user1.select().execute().fetchall(), [(u1.user_id, u1.name)])
- eq_(user2.select().execute().fetchall(), [(u2.user_id, u2.name)])
- eq_(address1.select().execute().fetchall(),
- [(a1.address_id, u1.user_id, 'a1@foo.com')])
- eq_(address2.select().execute().fetchall(),
- [(a1.address_id, u2.user_id, 'a2@foo.com')])
-
- sess.clear()
- u1list = sess.query(User, entity_name='user1').all()
- u2list = sess.query(User, entity_name='user2').all()
- eq_(len(u1list), 1)
- eq_(len(u2list), 1)
- assert u1list[0] is not u2list[0]
- eq_(len(u1list[0].addresses), 1)
- eq_(len(u2list[0].addresses), 1)
-
- @testing.resolve_artifact_names
- def test_polymorphic(self):
- """entity_name can be used to have two kinds of relations on the same class."""
- ctx = sa.orm.scoped_session(create_session)
-
- ma1 = mapper(Address1, address1, extension=ctx.extension)
- ma2 = mapper(Address2, address2, extension=ctx.extension)
-
- mapper(User, user1, entity_name='user1', extension=ctx.extension,
- properties=dict(
- addresses=relation(ma1)))
-
- mapper(User, user2, entity_name='user2', extension=ctx.extension,
- properties=dict(
- addresses=relation(ma2)))
-
- u1 = User(name='this is user 1', _sa_entity_name='user1')
- a1 = Address1(email='a1@foo.com')
- u1.addresses.append(a1)
-
- u2 = User(name='this is user 2', _sa_entity_name='user2')
- a2 = Address2(email='a2@foo.com')
- u2.addresses.append(a2)
-
- ctx.flush()
- eq_(user1.select().execute().fetchall(), [(u1.user_id, u1.name)])
- eq_(user2.select().execute().fetchall(), [(u2.user_id, u2.name)])
- eq_(address1.select().execute().fetchall(),
- [(a1.address_id, u1.user_id, 'a1@foo.com')])
- eq_(address2.select().execute().fetchall(),
- [(a1.address_id, u2.user_id, 'a2@foo.com')])
-
- ctx.clear()
- u1list = ctx.query(User, entity_name='user1').all()
- u2list = ctx.query(User, entity_name='user2').all()
- eq_(len(u1list), 1)
- eq_(len(u2list), 1)
- assert u1list[0] is not u2list[0]
- eq_(len(u1list[0].addresses), 1)
- eq_(len(u2list[0].addresses), 1)
-
- # the lazy load requires that setup_loader() check that the correct
- # LazyLoader is setting up for each load
- assert isinstance(u1list[0].addresses[0], Address1)
- assert isinstance(u2list[0].addresses[0], Address2)
-
- @testing.resolve_artifact_names
- def testpolymorphic_deferred(self):
- """Deferred columns load properly using entity names"""
-
- mapper(User, user1, entity_name='user1', properties=dict(
- name=sa.orm.deferred(user1.c.name)))
- mapper(User, user2, entity_name='user2', properties=dict(
- name=sa.orm.deferred(user2.c.name)))
-
- u1 = User(name='this is user 1')
- u2 = User(name='this is user 2')
-
- session = create_session()
- session.add(u1, entity_name='user1')
- session.add(u2, entity_name='user2')
- session.flush()
-
- eq_(user1.select().execute().fetchall(), [(u1.user_id, u1.name)])
- eq_(user2.select().execute().fetchall(), [(u2.user_id, u2.name)])
-
- session.clear()
- u1list = session.query(User, entity_name='user1').all()
- u2list = session.query(User, entity_name='user2').all()
-
- eq_(len(u1list), 1)
- eq_(len(u2list), 1)
- assert u1list[0] is not u2list[0]
-
- # the deferred column load requires that setup_loader() check that the
- # correct DeferredColumnLoader is setting up for each load
- eq_(u1list[0].name, 'this is user 1')
- eq_(u2list[0].name, 'this is user 2')
-
-
-class SelfReferentialTest(_base.MappedTest):
- def define_tables(self, metadata):
- Table('nodes', metadata,
- Column('id', Integer, primary_key=True),
- Column('parent_id', Integer, ForeignKey('nodes.id')),
- Column('data', String(50)),
- Column('type', String(50)))
-
- def setup_classes(self):
- class Node(_base.ComparableEntity):
- pass
-
- # fails inconsistently. entity name needs deterministic instrumentation.
- @testing.resolve_artifact_names
- def FIXME_test_relation(self):
- foonodes = nodes.select().where(nodes.c.type=='foo').alias()
- barnodes = nodes.select().where(nodes.c.type=='bar').alias()
-
- # TODO: the order of instrumentation here is not deterministic;
- # therefore the test fails sporadically since "Node.data" references
- # different mappers at different times
- m1 = mapper(Node, nodes)
- m2 = mapper(Node, foonodes, entity_name='foo')
- m3 = mapper(Node, barnodes, entity_name='bar')
-
- m1.add_property('foonodes', relation(
- m2,
- primaryjoin=nodes.c.id == foonodes.c.parent_id,
- backref=backref('foo_parent',
- remote_side=nodes.c.id,
- primaryjoin=nodes.c.id==foonodes.c.parent_id)))
-
- m1.add_property('barnodes', relation(
- m3,
- primaryjoin=nodes.c.id==barnodes.c.parent_id,
- backref=backref('bar_parent',
- remote_side=nodes.c.id,
- primaryjoin=nodes.c.id==barnodes.c.parent_id)))
-
- sess = create_session()
-
- n1 = Node(data='n1', type='bat')
- n1.foonodes.append(Node(data='n2', type='foo'))
- Node(data='n3', type='bar', bar_parent=n1)
- sess.add(n1)
- sess.flush()
- sess.clear()
-
- eq_(sess.query(Node, entity_name="bar").one(),
- Node(data='n3'))
- eq_(sess.query(Node).filter(Node.data=='n1').one(),
- Node(data='n1',
- foonodes=[Node(data='n2')],
- barnodes=[Node(data='n3')]))
-
-
-if __name__ == "__main__":
- testenv.main()
session.add(a)
assert b in session, 'base: %s' % base
- def test_compileonattr_rel_entity_name(self):
- m = MetaData()
- t1 = Table('t1', m,
- Column('id', Integer, primary_key=True),
- Column('x', Integer))
- t2 = Table('t2', m,
- Column('id', Integer, primary_key=True),
- Column('t1_id', Integer, ForeignKey('t1.id')))
- class A(object): pass
- class B(object): pass
- mapper(A, t1, properties=dict(bs=relation(B)), entity_name='x')
- mapper(B, t2)
-
- a = A()
- assert not a.bs
class FinderTest(_base.ORMTest):
def test_standard(self):
@testing.resolve_artifact_names
def test_double(self):
"""tests lazy loading with two relations simulatneously, from the same table, using aliases. """
+
openorders = sa.alias(orders, 'openorders')
closedorders = sa.alias(orders, 'closedorders')
mapper(Address, addresses)
-
+
+ mapper(Order, orders)
+
+ open_mapper = mapper(Order, openorders, non_primary=True)
+ closed_mapper = mapper(Order, closedorders, non_primary=True)
mapper(User, users, properties = dict(
addresses = relation(Address, lazy = True),
- open_orders = relation(mapper(Order, openorders, entity_name='open'), primaryjoin = sa.and_(openorders.c.isopen == 1, users.c.id==openorders.c.user_id), lazy=True),
- closed_orders = relation(mapper(Order, closedorders,entity_name='closed'), primaryjoin = sa.and_(closedorders.c.isopen == 0, users.c.id==closedorders.c.user_id), lazy=True)
+ open_orders = relation(open_mapper, primaryjoin = sa.and_(openorders.c.isopen == 1, users.c.id==openorders.c.user_id), lazy=True),
+ closed_orders = relation(closed_mapper, primaryjoin = sa.and_(closedorders.c.isopen == 0, users.c.id==closedorders.c.user_id), lazy=True)
))
q = create_session().query(User)
sess = create_session()
user = sess.query(User).get(7)
- assert [Order(id=1), Order(id=5)] == create_session().query(Order, entity_name='closed').with_parent(user, property='closed_orders').all()
- assert [Order(id=3)] == create_session().query(Order, entity_name='open').with_parent(user, property='open_orders').all()
+ assert [Order(id=1), Order(id=5)] == create_session().query(closed_mapper).with_parent(user, property='closed_orders').all()
+ assert [Order(id=3)] == create_session().query(open_mapper).with_parent(user, property='open_orders').all()
@testing.resolve_artifact_names
def test_many_to_many(self):
'create_instance', 'populate_instance', 'on_reconstitute', 'append_result',
'before_update', 'after_update', 'before_delete', 'after_delete'])
- @testing.resolve_artifact_names
- def test_single_instrumentor(self):
- ext_None, methods_None = self.extension()
- ext_x, methods_x = self.extension()
-
- def reset():
- sa.orm.clear_mappers()
- del methods_None[:]
- del methods_x[:]
-
- mapper(User, users, extension=ext_None())
- mapper(User, users, extension=ext_x(), entity_name='x')
- User()
-
- eq_(methods_None, ['instrument_class', 'init_instance'])
- eq_(methods_x, [])
-
- reset()
-
- mapper(User, users, extension=ext_x(), entity_name='x')
- mapper(User, users, extension=ext_None())
- User()
-
- eq_(methods_x, ['instrument_class', 'init_instance'])
- eq_(methods_None, [])
-
- reset()
-
- ext_y, methods_y = self.extension()
-
- mapper(User, users, extension=ext_x(), entity_name='x')
- mapper(User, users, extension=ext_y(), entity_name='y')
- User()
-
- eq_(methods_x, ['instrument_class', 'init_instance'])
- eq_(methods_y, [])
-
class RequirementsTest(_base.MappedTest):
"""Tests the contract for user classes."""
sess3.flush()
self.assert_sql_count(testing.db, go, 0)
- @testing.resolve_artifact_names
- def test_dont_load_sets_entityname(self):
- """dont_load-merged entity has entity_name set, has_mapper() passes, and lazyloads work"""
-
- mapper(User, users, properties={
- 'addresses':relation(mapper(Address, addresses), uselist=True)})
-
- sess = create_session()
- u = User()
- u.id = 7
- u.name = "fred"
- a1 = Address()
- a1.email_address='foo@bar.com'
- u.addresses.append(a1)
-
- sess.add(u)
- sess.flush()
- sess.clear()
-
- # reload 'u' such that its addresses list hasn't loaded
- u = sess.query(User).get(7)
-
- sess2 = create_session()
- u2 = sess2.merge(u, dont_load=True)
- assert not sess2.dirty
- # assert merged instance has a mapper and lazy load proceeds
- state = sa.orm.attributes.instance_state(u2)
- assert state.entity_name is not sa.orm.attributes.NO_ENTITY_NAME
- assert sa.orm.util.has_mapper(u2)
- def go():
- ne_( u2.addresses, [])
- eq_(len(u2.addresses), 1)
- self.assert_sql_count(testing.db, go, 1)
@testing.resolve_artifact_names
def test_dont_load_sets_backrefs(self):
@testing.resolve_artifact_names
def test_identity_key_1(self):
mapper(User, users)
- mapper(User, users, entity_name="en")
s = create_session()
key = s.identity_key(User, 1)
- eq_(key, (User, (1,), None))
- key = s.identity_key(User, 1, "en")
- eq_(key, (User, (1,), "en"))
- key = s.identity_key(User, 1, entity_name="en")
- eq_(key, (User, (1,), "en"))
- key = s.identity_key(User, ident=1, entity_name="en")
- eq_(key, (User, (1,), "en"))
+ eq_(key, (User, (1,)))
+ key = s.identity_key(User, ident=1)
+ eq_(key, (User, (1,)))
@testing.resolve_artifact_names
def test_identity_key_2(self):
s.add(u)
s.flush()
key = s.identity_key(instance=u)
- eq_(key, (User, (u.id,), None))
+ eq_(key, (User, (u.id,)))
@testing.resolve_artifact_names
def test_identity_key_3(self):
mapper(User, users)
- mapper(User, users, entity_name="en")
s = create_session()
row = {users.c.id: 1, users.c.name: "Frank"}
key = s.identity_key(User, row=row)
- eq_(key, (User, (1,), None))
- key = s.identity_key(User, row=row, entity_name="en")
- eq_(key, (User, (1,), "en"))
+ eq_(key, (User, (1,)))
@testing.resolve_artifact_names
def test_extension(self):