# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""sqlalchemy.orm.interfaces.LoaderStrategy implementations, and related MapperOptions."""
+"""sqlalchemy.orm.interfaces.LoaderStrategy
+ implementations, and related MapperOptions."""
from sqlalchemy import exc as sa_exc
from sqlalchemy import sql, util, log
attribute_ext.insert(0, _SingleParentValidator(prop))
if prop.key in prop.parent._validators:
- attribute_ext.insert(0, mapperutil.Validator(prop.key, prop.parent._validators[prop.key]))
+ attribute_ext.insert(0,
+ mapperutil.Validator(prop.key, prop.parent._validators[prop.key])
+ )
if useobject:
attribute_ext.append(sessionlib.UOWEventHandler(prop.key))
)
class UninstrumentedColumnLoader(LoaderStrategy):
- """Represent the strategy for a MapperProperty that doesn't instrument the class.
+ """Represent the a non-instrumented MapperProperty.
The polymorphic_on argument of mapper() often results in this,
if the argument is against the with_polymorphic selectable.
def init(self):
self.columns = self.parent_property.columns
- def setup_query(self, context, entity, path, adapter, column_collection=None, **kwargs):
+ def setup_query(self, context, entity, path, adapter,
+ column_collection=None, **kwargs):
for c in self.columns:
if adapter:
c = adapter.columns[c]
column_collection.append(c)
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
- return (None, None)
+ return None, None
class ColumnLoader(LoaderStrategy):
"""Strategize the loading of a plain column-based MapperProperty."""
self.columns = self.parent_property.columns
self.is_composite = hasattr(self.parent_property, 'composite_class')
- def setup_query(self, context, entity, path, adapter, column_collection=None, **kwargs):
+ def setup_query(self, context, entity, path, adapter,
+ column_collection=None, **kwargs):
for c in self.columns:
if adapter:
c = adapter.columns[c]
def copy(obj):
if obj is None:
return None
- return self.parent_property.composite_class(*obj.__composite_values__())
+ return self.parent_property.\
+ composite_class(*obj.__composite_values__())
def compare(a, b):
if a is None or b is None:
#active_history ?
)
- def create_row_processor(self, selectcontext, path, mapper, row, adapter):
+ def create_row_processor(self, selectcontext, path, mapper,
+ row, adapter):
key = self.key
columns = self.columns
composite_class = self.parent_property.composite_class
def init(self):
if hasattr(self.parent_property, 'composite_class'):
- raise NotImplementedError("Deferred loading for composite types not implemented yet")
+ raise NotImplementedError("Deferred loading for composite "
+ "types not implemented yet")
self.columns = self.parent_property.columns
self.group = self.parent_property.group
expire_missing=False
)
- def setup_query(self, context, entity, path, adapter, only_load_props=None, **kwargs):
- if \
- (self.group is not None and context.attributes.get(('undefer', self.group), False)) or \
- (only_load_props and self.key in only_load_props):
-
+ def setup_query(self, context, entity, path, adapter,
+ only_load_props=None, **kwargs):
+ if (
+ self.group is not None and
+ context.attributes.get(('undefer', self.group), False)
+ ) or (only_load_props and self.key in only_load_props):
self.parent_property._get_strategy(ColumnLoader).\
- setup_query(context, entity, path, adapter, **kwargs)
+ setup_query(context, entity,
+ path, adapter, **kwargs)
def _class_level_loader(self, state):
if not mapperutil._state_has_identity(state):
session = sessionlib._state_session(state)
if session is None:
raise orm_exc.DetachedInstanceError(
- "Parent instance %s is not bound to a Session; "
- "deferred load operation of attribute '%s' cannot proceed" %
- (mapperutil.state_str(state), self.key)
- )
+ "Parent instance %s is not bound to a Session; "
+ "deferred load operation of attribute '%s' cannot proceed" %
+ (mapperutil.state_str(state), self.key)
+ )
query = session.query(localparent)
ident = state.key[1]
- query._get(None, ident=ident, only_load_props=group, refresh_state=state)
+ query._get(None, ident=ident,
+ only_load_props=group, refresh_state=state)
return attributes.ATTR_WAS_SET
class DeferredOption(StrategizedOption):
query._attributes[('undefer', self.group)] = True
class AbstractRelationshipLoader(LoaderStrategy):
- """LoaderStratgies which deal with related objects as opposed to scalars."""
+ """LoaderStratgies which deal with related objects."""
def init(self):
self.mapper = self.parent_property.mapper
for c in self.mapper._equivalent_columns[col]:
self._equated_columns[c] = self._equated_columns[col]
- self.logger.info("%s will use query.get() to optimize instance loads" % self)
+ self.logger.info("%s will use query.get() to "
+ "optimize instance loads" % self)
def init_class_attribute(self, mapper):
self.is_class_level = True
- # MANYTOONE currently only needs the "old" value for delete-orphan
- # cascades. the required _SingleParentValidator will enable active_history
- # in that case. otherwise we don't need the "old" value during backref operations.
+ # MANYTOONE currently only needs the
+ # "old" value for delete-orphan
+ # cascades. the required _SingleParentValidator
+ # will enable active_history
+ # in that case. otherwise we don't need the
+ # "old" value during backref operations.
_register_attribute(self,
mapper,
useobject=True,
callable_=self._class_level_loader,
uselist = self.parent_property.uselist,
typecallable = self.parent_property.collection_class,
- active_history = self.parent_property.direction is not interfaces.MANYTOONE or not self.use_get,
+ active_history = \
+ self.parent_property.direction is not \
+ interfaces.MANYTOONE or \
+ not self.use_get,
)
- def lazy_clause(self, state, reverse_direction=False, alias_secondary=False, adapt_source=None):
+ def lazy_clause(self, state, reverse_direction=False,
+ alias_secondary=False, adapt_source=None):
if state is None:
- return self._lazy_none_clause(reverse_direction, adapt_source=adapt_source)
+ return self._lazy_none_clause(
+ reverse_direction,
+ adapt_source=adapt_source)
if not reverse_direction:
- (criterion, bind_to_col, rev) = (self.__lazywhere, self.__bind_to_col, self._equated_columns)
+ criterion, bind_to_col, rev = \
+ self.__lazywhere, \
+ self.__bind_to_col, \
+ self._equated_columns
else:
- (criterion, bind_to_col, rev) = LazyLoader._create_lazy_clause(self.parent_property, reverse_direction=reverse_direction)
+ criterion, bind_to_col, rev = \
+ LazyLoader._create_lazy_clause(
+ self.parent_property,
+ reverse_direction=reverse_direction)
if reverse_direction:
mapper = self.parent_property.mapper
def visit_bindparam(bindparam):
if bindparam.key in bind_to_col:
- # use the "committed" (database) version to get query column values
- # also its a deferred value; so that when used by Query, the committed value is used
+ # use the "committed" (database) version to get
+ # query column values
+ # also its a deferred value; so that when used
+ # by Query, the committed value is used
# after an autoflush occurs
o = state.obj() # strong ref
- bindparam.value = lambda: mapper._get_committed_attr_by_column(o, bind_to_col[bindparam.key])
+ bindparam.value = \
+ lambda: mapper._get_committed_attr_by_column(
+ o, bind_to_col[bindparam.key])
if self.parent_property.secondary is not None and alias_secondary:
- criterion = sql_util.ClauseAdapter(self.parent_property.secondary.alias()).traverse(criterion)
+ criterion = sql_util.ClauseAdapter(
+ self.parent_property.secondary.alias()).\
+ traverse(criterion)
- criterion = visitors.cloned_traverse(criterion, {}, {'bindparam':visit_bindparam})
+ criterion = visitors.cloned_traverse(
+ criterion, {}, {'bindparam':visit_bindparam})
if adapt_source:
criterion = adapt_source(criterion)
return criterion
def _lazy_none_clause(self, reverse_direction=False, adapt_source=None):
if not reverse_direction:
- (criterion, bind_to_col, rev) = (self.__lazywhere, self.__bind_to_col, self._equated_columns)
+ criterion, bind_to_col, rev = \
+ self.__lazywhere, \
+ self.__bind_to_col,\
+ self._equated_columns
else:
- (criterion, bind_to_col, rev) = LazyLoader._create_lazy_clause(self.parent_property, reverse_direction=reverse_direction)
+ criterion, bind_to_col, rev = \
+ LazyLoader._create_lazy_clause(
+ self.parent_property,
+ reverse_direction=reverse_direction)
criterion = sql_util.adapt_criterion_to_null(criterion, bind_to_col)
key = self.key
if not self.is_class_level:
def new_execute(state, dict_, row):
- # we are not the primary manager for this attribute on this class - set up a
- # per-instance lazyloader, which will override the class-level behavior.
- # this currently only happens when using a "lazyload" option on a "no load"
- # attribute - "eager" attributes always have a class-level lazyloader
- # installed.
+ # we are not the primary manager for this attribute
+ # on this class - set up a
+ # per-instance lazyloader, which will override the
+ # class-level behavior.
+ # this currently only happens when using a
+ # "lazyload" option on a "no load"
+ # attribute - "eager" attributes always have a
+ # class-level lazyloader installed.
state.set_callable(dict_, key, LoadLazyAttribute(state, key))
else:
def new_execute(state, dict_, row):
- # we are the primary manager for this attribute on this class - reset its
- # per-instance attribute state, so that the class-level lazy loader is
- # executed when next referenced on this instance. this is needed in
- # populate_existing() types of scenarios to reset any existing state.
+ # we are the primary manager for this attribute on
+ # this class - reset its
+ # per-instance attribute state, so that the class-level
+ # lazy loader is
+ # executed when next referenced on this instance.
+ # this is needed in
+ # populate_existing() types of scenarios to reset
+ # any existing state.
state.reset(dict_, key)
return new_execute, None
-
+
+ @classmethod
def _create_lazy_clause(cls, prop, reverse_direction=False):
binds = util.column_dict()
lookup = util.column_dict()
lazywhere = prop.primaryjoin
if prop.secondaryjoin is None or not reverse_direction:
- lazywhere = visitors.replacement_traverse(lazywhere, {}, col_to_bind)
+ lazywhere = visitors.replacement_traverse(
+ lazywhere, {}, col_to_bind)
if prop.secondaryjoin is not None:
secondaryjoin = prop.secondaryjoin
if reverse_direction:
- secondaryjoin = visitors.replacement_traverse(secondaryjoin, {}, col_to_bind)
+ secondaryjoin = visitors.replacement_traverse(
+ secondaryjoin, {}, col_to_bind)
lazywhere = sql.and_(lazywhere, secondaryjoin)
bind_to_col = dict((binds[col].key, col) for col in binds)
- return (lazywhere, bind_to_col, equated_columns)
- _create_lazy_clause = classmethod(_create_lazy_clause)
+ return lazywhere, bind_to_col, equated_columns
log.class_logger(LazyLoader)
prop = instance_mapper.get_property(self.key)
strategy = prop._get_strategy(LazyLoader)
- if kw.get('passive') is attributes.PASSIVE_NO_FETCH and not strategy.use_get:
+ if kw.get('passive') is attributes.PASSIVE_NO_FETCH and \
+ not strategy.use_get:
return attributes.PASSIVE_NO_RESULT
if strategy._should_log_debug():
strategy.logger.debug("loading %s",
- mapperutil.state_attribute_str(state, self.key))
+ mapperutil.state_attribute_str(
+ state, self.key))
session = sessionlib._state_session(state)
if session is None:
ident = []
allnulls = True
for primary_key in prop.mapper.primary_key:
- val = instance_mapper._get_committed_state_attr_by_column(
- state, strategy._equated_columns[primary_key], **kw)
+ val = instance_mapper.\
+ _get_committed_state_attr_by_column(
+ state,
+ strategy._equated_columns[primary_key],
+ **kw)
if val is attributes.PASSIVE_NO_RESULT:
return val
allnulls = allnulls and val is None
if l > 1:
util.warn(
"Multiple rows returned with "
- "uselist=False for lazily-loaded attribute '%s' " % prop)
+ "uselist=False for lazily-loaded attribute '%s' "
+ % prop)
return result[0]
else:
class SubqueryLoader(AbstractRelationshipLoader):
def init_class_attribute(self, mapper):
- self.parent_property._get_strategy(LazyLoader).init_class_attribute(mapper)
+ self.parent_property.\
+ _get_strategy(LazyLoader).\
+ init_class_attribute(mapper)
- def setup_query(self, context, entity, path, adapter,
- column_collection=None, parentmapper=None, **kwargs):
+ def setup_query(self, context, entity,
+ path, adapter, column_collection=None,
+ parentmapper=None, **kwargs):
if not context.query._enable_eagerloads:
return
attr = self.parent_property.class_attribute
- # modify the query to just look for parent columns in the join condition
+ # modify the query to just look for parent columns in the
+ # join condition
- # TODO: what happens to options() in the parent query ? are they going
+ # TODO: what happens to options() in the parent query ?
+ # are they going
# to get in the way here ?
+ # set the original query to only look
+ # for the significant columns, not order
+ # by anything.
q = context.query._clone()
q._set_entities(local_attr)
+ q._order_by = None
+ # now select from it as a subquery.
q = q.from_self(self.mapper, *local_attr)
+ # and join to the related thing we want
+ # to load.
q = q.join(attr)
q = q.order_by(*local_attr)
else:
return \
[p[0] for p in self.parent_property.synchronize_pairs],\
- [p[0] for p in self.parent_property.secondary_synchronize_pairs]
+ [
+ p[0] for p in self.parent_property.
+ secondary_synchronize_pairs
+ ]
def create_row_processor(self, context, path, mapper, row, adapter):
path = path + (self.key,)
local_cols, remote_cols = self._local_remote_columns
local_attr = [self.parent._get_col_to_prop(c).key for c in local_cols]
- remote_attr = [self.mapper._get_col_to_prop(c).key for c in remote_cols]
+ remote_attr = [
+ self.mapper._get_col_to_prop(c).key
+ for c in remote_cols]
q = context.attributes[('subquery', path)]
- collections = dict((k, [v[0] for v in v]) for k, v in itertools.groupby(
+ collections = dict(
+ (k, [v[0] for v in v])
+ for k, v in itertools.groupby(
q,
lambda x:x[1:]
))
-
def execute(state, dict_, row):
collection = collections.get(
tuple([row[col] for col in local_cols]),
()
)
- state.get_impl(self.key).set_committed_value(state, dict_, collection)
+ state.get_impl(self.key).\
+ set_committed_value(state, dict_, collection)
- return (execute, None)
-
+ return execute, None
class EagerLoader(AbstractRelationshipLoader):
- """Strategize a relationship() that loads within the process of the parent object being selected."""
+ """Strategize a relationship() that loads within the process
+ of the parent object being selected."""
def init(self):
super(EagerLoader, self).init()
self.join_depth = self.parent_property.join_depth
def init_class_attribute(self, mapper):
- self.parent_property._get_strategy(LazyLoader).init_class_attribute(mapper)
+ self.parent_property.\
+ _get_strategy(LazyLoader).init_class_attribute(mapper)
def setup_query(self, context, entity, path, adapter, \
- column_collection=None, parentmapper=None, **kwargs):
+ column_collection=None, parentmapper=None,
+ **kwargs):
"""Add a left outer join to the statement thats being constructed."""
if not context.query._enable_eagerloads:
reduced_path = interfaces._reduce_path(path)
# check for user-defined eager alias
- if ("user_defined_eager_row_processor", reduced_path) in context.attributes:
- clauses = context.attributes[("user_defined_eager_row_processor", reduced_path)]
+ if ("user_defined_eager_row_processor", reduced_path) in\
+ context.attributes:
+ clauses = context.attributes[
+ ("user_defined_eager_row_processor",
+ reduced_path)]
adapter = entity._get_entity_clauses(context.query, context)
if adapter and clauses:
- context.attributes[("user_defined_eager_row_processor", reduced_path)] = \
- clauses = clauses.wrap(adapter)
+ context.attributes[
+ ("user_defined_eager_row_processor",
+ reduced_path)] = clauses = clauses.wrap(adapter)
elif adapter:
- context.attributes[("user_defined_eager_row_processor", reduced_path)] = \
- clauses = adapter
+ context.attributes[
+ ("user_defined_eager_row_processor",
+ reduced_path)] = clauses = adapter
add_to_collection = context.primary_columns
if self.mapper.base_mapper in reduced_path:
return
- clauses = mapperutil.ORMAdapter(mapperutil.AliasedClass(self.mapper),
- equivalents=self.mapper._equivalent_columns, adapt_required=True)
+ clauses = mapperutil.ORMAdapter(
+ mapperutil.AliasedClass(self.mapper),
+ equivalents=self.mapper._equivalent_columns,
+ adapt_required=True)
if self.parent_property.direction != interfaces.MANYTOONE:
context.multi_row_eager_loaders = True
context.create_eager_joins.append(
- (self._create_eager_join, context, entity, path, adapter, parentmapper, clauses)
+ (self._create_eager_join, context,
+ entity, path, adapter,
+ parentmapper, clauses)
)
add_to_collection = context.secondary_columns
- context.attributes[("eager_row_processor", reduced_path)] = clauses
+ context.attributes[
+ ("eager_row_processor", reduced_path)
+ ] = clauses
for value in self.mapper._iterate_polymorphic_properties():
value.setup(
parentmapper=self.mapper,
column_collection=add_to_collection)
- def _create_eager_join(self, context, entity, path, adapter, parentmapper, clauses):
+ def _create_eager_join(self, context, entity,
+ path, adapter, parentmapper, clauses):
if parentmapper is None:
localparent = entity.mapper
not should_nest_selectable and \
context.from_clause:
index, clause = \
- sql_util.find_join_source(context.from_clause, entity.selectable)
+ sql_util.find_join_source(
+ context.from_clause, entity.selectable)
if clause is not None:
# join to an existing FROM clause on the query.
# key it to its list index in the eager_joins dict.
- # Query._compile_context will adapt as needed and append to the
- # FROM clause of the select().
+ # Query._compile_context will adapt as needed and
+ # append to the FROM clause of the select().
entity_key, default_towrap = index, clause
if entity_key is None:
join_to_left = False
if adapter:
if getattr(adapter, 'aliased_class', None):
- onclause = getattr(adapter.aliased_class, self.key, self.parent_property)
+ onclause = getattr(
+ adapter.aliased_class, self.key,
+ self.parent_property)
else:
- onclause = getattr(mapperutil.AliasedClass(self.parent, adapter.selectable),
- self.key, self.parent_property)
+ onclause = getattr(
+ mapperutil.AliasedClass(
+ self.parent,
+ adapter.selectable
+ ),
+ self.key, self.parent_property
+ )
if onclause is self.parent_property:
- # TODO: this is a temporary hack to account for polymorphic eager loads where
+ # TODO: this is a temporary hack to
+ # account for polymorphic eager loads where
# the eagerload is referencing via of_type().
join_to_left = True
else:
onclause = self.parent_property
- innerjoin = context.attributes.get(("eager_join_type", path),
- self.parent_property.innerjoin)
+ innerjoin = context.attributes.get(
+ ("eager_join_type", path),
+ self.parent_property.innerjoin)
- context.eager_joins[entity_key] = eagerjoin = mapperutil.join(
- towrap,
- clauses.aliased_class,
- onclause,
- join_to_left=join_to_left,
- isouter=not innerjoin
- )
+ context.eager_joins[entity_key] = eagerjoin = \
+ mapperutil.join(
+ towrap,
+ clauses.aliased_class,
+ onclause,
+ join_to_left=join_to_left,
+ isouter=not innerjoin
+ )
# send a hint to the Query as to where it may "splice" this join
eagerjoin.stop_on = entity.selectable
if self.parent_property.secondary is None and \
not parentmapper:
# for parentclause that is the non-eager end of the join,
- # ensure all the parent cols in the primaryjoin are actually in the
+ # ensure all the parent cols in the primaryjoin are actually
+ # in the
# columns clause (i.e. are not deferred), so that aliasing applied
- # by the Query propagates those columns outward. This has the effect
+ # by the Query propagates those columns outward.
+ # This has the effect
# of "undefering" those columns.
- for col in sql_util.find_columns(self.parent_property.primaryjoin):
+ for col in sql_util.find_columns(
+ self.parent_property.primaryjoin):
if localparent.mapped_table.c.contains_column(col):
if adapter:
col = adapter.columns[col]
context.eager_order_by += \
eagerjoin._target_adapter.\
copy_and_process(
- util.to_list(self.parent_property.order_by)
+ util.to_list(
+ self.parent_property.order_by
+ )
)
def _create_eager_adapter(self, context, row, adapter, path):
reduced_path = interfaces._reduce_path(path)
- if ("user_defined_eager_row_processor", reduced_path) in context.attributes:
- decorator = context.attributes[("user_defined_eager_row_processor", reduced_path)]
- # user defined eagerloads are part of the "primary" portion of the load.
+ if ("user_defined_eager_row_processor", reduced_path) in \
+ context.attributes:
+ decorator = context.attributes[
+ ("user_defined_eager_row_processor",
+ reduced_path)]
+ # user defined eagerloads are part of the "primary"
+ # portion of the load.
# the adapters applied to the Query should be honored.
if context.adapter and decorator:
decorator = decorator.wrap(context.adapter)
elif context.adapter:
decorator = context.adapter
elif ("eager_row_processor", reduced_path) in context.attributes:
- decorator = context.attributes[("eager_row_processor", reduced_path)]
+ decorator = context.attributes[
+ ("eager_row_processor", reduced_path)]
else:
return False
def create_row_processor(self, context, path, mapper, row, adapter):
path = path + (self.key,)
- eager_adapter = self._create_eager_adapter(context, row, adapter, path)
+ eager_adapter = self._create_eager_adapter(
+ context,
+ row,
+ adapter, path)
if eager_adapter is not False:
key = self.key
return new_execute, existing_execute
else:
def new_execute(state, dict_, row):
- collection = attributes.init_state_collection(state, dict_,
- key)
+ collection = attributes.init_state_collection(
+ state, dict_, key)
result_list = util.UniqueAppender(collection,
'append_without_event')
context.attributes[(state, key)] = result_list
# distinct sets of result columns
collection = attributes.init_state_collection(state,
dict_, key)
- result_list = util.UniqueAppender(collection,
- 'append_without_event')
+ result_list = util.UniqueAppender(
+ collection,
+ 'append_without_event')
context.attributes[(state, key)] = result_list
_instance(row, result_list)
return new_execute, existing_execute
else:
- return self.parent_property._get_strategy(LazyLoader).\
- create_row_processor(context, path, mapper, row, adapter)
+ return self.parent_property.\
+ _get_strategy(LazyLoader).\
+ create_row_processor(
+ context, path,
+ mapper, row, adapter)
log.class_logger(EagerLoader)
if value is not None:
hasparent = initiator.hasparent(attributes.instance_state(value))
if hasparent and oldvalue is not value:
- raise sa_exc.InvalidRequestError("Instance %s is already associated with an instance "
- "of %s via its %s attribute, and is only allowed a single parent." %
+ raise sa_exc.InvalidRequestError(
+ "Instance %s is already associated with an instance "
+ "of %s via its %s attribute, and is only allowed a "
+ "single parent." %
(mapperutil.instance_str(value), state.class_, self.prop)
)
return value
@testing.resolve_artifact_names
def test_no_orphan(self):
"""An eagerly loaded child object is not marked as an orphan"""
+
mapper(User, users, properties={
'addresses':relationship(Address, cascade="all,delete-orphan", lazy=False)
})
sess = create_session()
user = sess.query(User).get(7)
- assert getattr(User, 'addresses').hasparent(sa.orm.attributes.instance_state(user.addresses[0]), optimistic=True)
- assert not sa.orm.class_mapper(Address)._is_orphan(sa.orm.attributes.instance_state(user.addresses[0]))
+ assert getattr(User, 'addresses').\
+ hasparent(sa.orm.attributes.instance_state(user.addresses[0]), optimistic=True)
+ assert not sa.orm.class_mapper(Address).\
+ _is_orphan(sa.orm.attributes.instance_state(user.addresses[0]))
@testing.resolve_artifact_names
def test_orderby(self):
mapper(User, users, properties = {
- 'addresses':relationship(mapper(Address, addresses), lazy=False, order_by=addresses.c.email_address),
+ 'addresses':relationship(mapper(Address, addresses),
+ lazy=False, order_by=addresses.c.email_address),
})
q = create_session().query(User)
eq_([
@testing.resolve_artifact_names
def test_orderby_multi(self):
mapper(User, users, properties = {
- 'addresses':relationship(mapper(Address, addresses), lazy=False, order_by=[addresses.c.email_address, addresses.c.id]),
+ 'addresses':relationship(mapper(Address, addresses),
+ lazy=False,
+ order_by=[addresses.c.email_address, addresses.c.id]),
})
q = create_session().query(User)
eq_([
@testing.resolve_artifact_names
def test_orderby_related(self):
- """A regular mapper select on a single table can order by a relationship to a second table"""
+ """A regular mapper select on a single table can
+ order by a relationship to a second table"""
+
mapper(Address, addresses)
mapper(User, users, properties = dict(
addresses = relationship(Address, lazy=False, order_by=addresses.c.id),
@testing.resolve_artifact_names
def test_deferred_fk_col(self):
- User, Address, Dingaling = self.classes.get_all(
- 'User', 'Address', 'Dingaling')
- users, addresses, dingalings = self.tables.get_all(
- 'users', 'addresses', 'dingalings')
-
mapper(Address, addresses, properties={
'user_id':deferred(addresses.c.user_id),
'user':relationship(User, lazy=False)
@testing.resolve_artifact_names
def test_many_to_many(self):
- Keyword, Item = self.Keyword, self.Item
- keywords, item_keywords, items = self.tables.get_all(
- 'keywords', 'item_keywords', 'items')
mapper(Keyword, keywords)
mapper(Item, items, properties = dict(
@testing.resolve_artifact_names
def test_eager_option(self):
- Keyword, Item = self.Keyword, self.Item
- keywords, item_keywords, items = self.tables.get_all(
- 'keywords', 'item_keywords', 'items')
-
mapper(Keyword, keywords)
mapper(Item, items, properties = dict(
keywords = relationship(Keyword, secondary=item_keywords, lazy=True,
@testing.resolve_artifact_names
def test_cyclical(self):
"""A circular eager relationship breaks the cycle with a lazy loader"""
- User, Address = self.User, self.Address
- users, addresses = self.tables.get_all('users', 'addresses')
mapper(Address, addresses)
mapper(User, users, properties = dict(
@testing.resolve_artifact_names
def test_double(self):
"""Eager loading with two relationships simultaneously, from the same table, using aliases."""
- User, Address, Order = self.classes.get_all(
- 'User', 'Address', 'Order')
- users, addresses, orders = self.tables.get_all(
- 'users', 'addresses', 'orders')
openorders = sa.alias(orders, 'openorders')
closedorders = sa.alias(orders, 'closedorders')
@testing.resolve_artifact_names
def test_double_same_mappers(self):
"""Eager loading with two relationships simulatneously, from the same table, using aliases."""
- User, Address, Order = self.classes.get_all(
- 'User', 'Address', 'Order')
- users, addresses, orders = self.tables.get_all(
- 'users', 'addresses', 'orders')
mapper(Address, addresses)
mapper(Order, orders, properties={
@testing.resolve_artifact_names
def test_no_false_hits(self):
"""Eager loaders don't interpret main table columns as part of their eager load."""
- User, Address, Order = self.classes.get_all(
- 'User', 'Address', 'Order')
- users, addresses, orders = self.tables.get_all(
- 'users', 'addresses', 'orders')
mapper(User, users, properties={
'addresses':relationship(Address, lazy=False),