from sqlalchemy import exc as sa_exc
from sqlalchemy import sql, util, log, event
from sqlalchemy.sql import util as sql_util
-from sqlalchemy.sql import visitors, expression, operators
-from sqlalchemy.orm import mapper, attributes, interfaces, exc as orm_exc
+from sqlalchemy.sql import visitors
+from sqlalchemy.orm import attributes, interfaces, exc as orm_exc
from sqlalchemy.orm.mapper import _none_set
from sqlalchemy.orm.interfaces import (
LoaderStrategy, StrategizedOption, MapperOption, PropertyOption,
- serialize_path, deserialize_path, StrategizedProperty
+ StrategizedProperty
)
from sqlalchemy.orm import session as sessionlib, unitofwork
from sqlalchemy.orm import util as mapperutil
if adapter:
col = adapter.columns[col]
if col is not None and col in row:
- def new_execute(state, dict_, row):
+ def fetch_col(state, dict_, row):
dict_[key] = row[col]
- return new_execute, None, None
+ return fetch_col, None, None
else:
- def new_execute(state, dict_, row):
+ def expire_for_non_present_col(state, dict_, row):
state.expire_attribute_pre_commit(dict_, key)
- return new_execute, None, None
+ return expire_for_non_present_col, None, None
log.class_logger(ColumnLoader)
context, path, reduced_path, mapper, row, adapter)
elif not self.is_class_level:
- def new_execute(state, dict_, row):
+ def set_deferred_for_local_state(state, dict_, row):
state.set_callable(dict_, key, LoadDeferredColumns(state, key))
+ return set_deferred_for_local_state, None, None
else:
- def new_execute(state, dict_, row):
+ def reset_col_for_deferred(state, dict_, row):
# reset state on the key so that deferred callables
# fire off on next access.
state.reset(dict_, key)
-
- return new_execute, None, None
+ return reset_col_for_deferred, None, None
def init(self):
if hasattr(self.parent_property, 'composite_class'):
if passive is attributes.PASSIVE_NO_FETCH:
return attributes.PASSIVE_NO_RESULT
- prop = self.parent_property
localparent = state.manager.mapper
if self.group:
)
def create_row_processor(self, context, path, reduced_path, mapper, row, adapter):
- def new_execute(state, dict_, row):
+ def invoke_no_load(state, dict_, row):
state.initialize(self.key)
- return new_execute, None, None
+ return invoke_no_load, None, None
log.class_logger(NoLoader)
(not self.parent_property.load_on_pending or not state.session_id):
return attributes.ATTR_EMPTY
- instance_mapper = state.manager.mapper
- prop = self.parent_property
- key = self.key
- prop_mapper = self.mapper
pending = not state.key
+ ident_key = None
if (
(passive is attributes.PASSIVE_NO_FETCH or \
raise orm_exc.DetachedInstanceError(
"Parent instance %s is not bound to a Session; "
"lazy load operation of attribute '%s' cannot proceed" %
- (mapperutil.state_str(state), key)
+ (mapperutil.state_str(state), self.key)
)
# if we have a simple primary key load, check the
# identity map without generating a Query at all
if self.use_get:
- if session._flushing:
- get_attr = instance_mapper._get_committed_state_attr_by_column
- else:
- get_attr = instance_mapper._get_state_attr_by_column
-
- dict_ = state.dict
- if passive is attributes.PASSIVE_NO_FETCH_RELATED:
- attr_passive = attributes.PASSIVE_OFF
- else:
- attr_passive = passive
-
- ident = [
- get_attr(
- state,
- state.dict,
- self._equated_columns[pk],
- passive=attr_passive)
- for pk in prop_mapper.primary_key
- ]
+ ident = self._get_ident_for_use_get(
+ session,
+ state,
+ passive
+ )
if attributes.PASSIVE_NO_RESULT in ident:
return attributes.PASSIVE_NO_RESULT
elif attributes.NEVER_SET in ident:
if _none_set.issuperset(ident):
return None
- ident_key = prop_mapper.identity_key_from_primary_key(ident)
+ ident_key = self.mapper.identity_key_from_primary_key(ident)
instance = Query._get_from_identity(session, ident_key, passive)
if instance is not None:
return instance
passive is attributes.PASSIVE_NO_FETCH_RELATED:
return attributes.PASSIVE_NO_RESULT
- q = session.query(prop_mapper)._adapt_all_clauses()
+ return self._emit_lazyload(session, state, ident_key)
+
+ def _get_ident_for_use_get(self, session, state, passive):
+ instance_mapper = state.manager.mapper
+
+ if session._flushing:
+ get_attr = instance_mapper._get_committed_state_attr_by_column
+ else:
+ get_attr = instance_mapper._get_state_attr_by_column
+
+ # create a strong reference
+ # to state.dict
+ dict_ = state.dict
+
+ if passive is attributes.PASSIVE_NO_FETCH_RELATED:
+ attr_passive = attributes.PASSIVE_OFF
+ else:
+ attr_passive = passive
+
+ return [
+ get_attr(
+ state,
+ state.dict,
+ self._equated_columns[pk],
+ passive=attr_passive)
+ for pk in self.mapper.primary_key
+ ]
+
+ def _emit_lazyload(self, session, state, ident_key):
+ q = session.query(self.mapper)._adapt_all_clauses()
q = q._with_invoke_all_eagers(False)
+ pending = not state.key
+
# don't autoflush on pending
if pending:
q = q.autoflush(False)
if state.load_path:
- q = q._with_current_path(state.load_path + (key,))
+ q = q._with_current_path(state.load_path + (self.key,))
if state.load_options:
q = q._conditional_options(*state.load_options)
if self.use_get:
return q._load_on_ident(ident_key)
- if prop.order_by:
- q = q.order_by(*util.to_list(prop.order_by))
+ if self.parent_property.order_by:
+ q = q.order_by(*util.to_list(self.parent_property.order_by))
- for rev in prop._reverse_property:
+ for rev in self.parent_property._reverse_property:
# reverse props that are MANYTOONE are loading *this*
# object from get(), so don't need to eager out to those.
if rev.direction is interfaces.MANYTOONE and \
util.warn(
"Multiple rows returned with "
"uselist=False for lazily-loaded attribute '%s' "
- % prop)
+ % self.parent_property)
return result[0]
else:
return None
+
def create_row_processor(self, context, path, reduced_path,
mapper, row, adapter):
key = self.key
if not self.is_class_level:
- def new_execute(state, dict_, row):
+ def set_lazy_callable(state, dict_, row):
# we are not the primary manager for this attribute
# on this class - set up a
# per-instance lazyloader, which will override the
# attribute - "eager" attributes always have a
# class-level lazyloader installed.
state.set_callable(dict_, key, LoadLazyAttribute(state, key))
+ return set_lazy_callable, None, None
else:
- def new_execute(state, dict_, row):
+ def reset_for_lazy_callable(state, dict_, row):
# we are the primary manager for this attribute on
# this class - reset its
# per-instance attribute state, so that the class-level
# any existing state.
state.reset(dict_, key)
- return new_execute, None, None
+ return reset_for_lazy_callable, None, None
@classmethod
def _create_lazy_clause(cls, prop, reverse_direction=False):
parentmapper=None, **kwargs):
pass
- def create_row_processor(self, context, path, reduced_path, mapper, row, adapter):
- def execute(state, dict_, row):
+ def create_row_processor(self, context, path, reduced_path,
+ mapper, row, adapter):
+ def load_immediate(state, dict_, row):
state.get_impl(self.key).get(state, dict_)
- return None, None, execute
+ return None, None, load_immediate
class SubqueryLoader(AbstractRelationshipLoader):
def init(self):
init_class_attribute(mapper)
def setup_query(self, context, entity,
- path, reduced_path, adapter, column_collection=None,
+ path, reduced_path, adapter,
+ column_collection=None,
parentmapper=None, **kwargs):
if not context.query._enable_eagerloads:
if len(path) / 2 > self.join_depth:
return
else:
- if self.mapper.base_mapper in interfaces._reduce_path(subq_path):
+ if self.mapper.base_mapper in \
+ interfaces._reduce_path(subq_path):
return
+ subq_mapper, leftmost_mapper, leftmost_prop, \
+ leftmost_cols, remote_cols, leftmost_attr = \
+ self._get_leftmost(subq_path)
+
orig_query = context.attributes.get(
("orig_query", SubqueryLoader),
context.query)
+ # generate a new Query from the original, then
+ # produce a subquery from it.
+ left_alias = self._generate_from_original_query(
+ orig_query, leftmost_mapper,
+ leftmost_attr, subq_path
+ )
+
+ # generate another Query that will join the
+ # left alias to the target relationships.
+ # basically doing a longhand
+ # "from_self()". (from_self() itself not quite industrial
+ # strength enough for all contingencies...but very close)
+ q = orig_query.session.query(self.mapper)
+ q._attributes = {
+ ("orig_query", SubqueryLoader): orig_query,
+ ('subquery_path', None) : subq_path
+ }
+ q = q._enable_single_crit(False)
+
+ to_join, local_attr, parent_alias = \
+ self._prep_for_joins(left_alias, subq_path)
+ q = q.order_by(*local_attr)
+ q = q.add_columns(*local_attr)
+
+ q = self._apply_joins(q, to_join, left_alias, parent_alias)
+
+ q = self._setup_options(q, subq_path, orig_query)
+ q = self._setup_outermost_orderby(q)
+
+ # add new query to attributes to be picked up
+ # by create_row_processor
+ context.attributes[('subquery', reduced_path)] = q
+
+ def _get_leftmost(self, subq_path):
subq_mapper = mapperutil._class_to_mapper(subq_path[0])
# determine attributes of the leftmost mapper
leftmost_mapper._columntoproperty[c].class_attribute
for c in leftmost_cols
]
+ return subq_mapper, leftmost_mapper, leftmost_prop, \
+ leftmost_cols, remote_cols, leftmost_attr
+ def _generate_from_original_query(self,
+ orig_query, leftmost_mapper,
+ leftmost_attr, subq_path
+ ):
# reformat the original query
# to look only for significant columns
q = orig_query._clone()
# which we'll join onto.
embed_q = q.with_labels().subquery()
left_alias = mapperutil.AliasedClass(leftmost_mapper, embed_q)
+ return left_alias
- # q becomes a new query. basically doing a longhand
- # "from_self()". (from_self() itself not quite industrial
- # strength enough for all contingencies...but very close)
-
- q = q.session.query(self.mapper)
- q._attributes = {
- ("orig_query", SubqueryLoader): orig_query,
- ('subquery_path', None) : subq_path
- }
- q = q._enable_single_crit(False)
+ def _prep_for_joins(self, left_alias, subq_path):
# figure out what's being joined. a.k.a. the fun part
to_join = [
(subq_path[i], subq_path[i+1])
for c in local_cols
]
- q = q.order_by(*local_attr)
- q = q.add_columns(*local_attr)
+ return to_join, local_attr, parent_alias
+ def _apply_joins(self, q, to_join, left_alias, parent_alias):
for i, (mapper, key) in enumerate(to_join):
# we need to use query.join() as opposed to
q = q.join(parent_alias, attr, from_joinpoint=True)
else:
q = q.join(attr, aliased=middle, from_joinpoint=True)
+ return q
+
+ def _local_remote_columns(self, prop):
+ if prop.secondary is None:
+ return zip(*prop.local_remote_pairs)
+ else:
+ return \
+ [p[0] for p in prop.synchronize_pairs],\
+ [
+ p[0] for p in prop.
+ secondary_synchronize_pairs
+ ]
+ def _setup_options(self, q, subq_path, orig_query):
# propagate loader options etc. to the new query.
# these will fire relative to subq_path.
q = q._with_current_path(subq_path)
q = q._conditional_options(*orig_query._with_options)
+ return q
+ def _setup_outermost_orderby(self, q):
if self.parent_property.order_by:
# if there's an ORDER BY, alias it the same
# way joinedloader does, but we have to pull out
)
)
q = q.order_by(*eager_order_by)
-
- # add new query to attributes to be picked up
- # by create_row_processor
- context.attributes[('subquery', reduced_path)] = q
-
- def _local_remote_columns(self, prop):
- if prop.secondary is None:
- return zip(*prop.local_remote_pairs)
- else:
- return \
- [p[0] for p in prop.synchronize_pairs],\
- [
- p[0] for p in prop.
- secondary_synchronize_pairs
- ]
+ return q
def create_row_processor(self, context, path, reduced_path,
mapper, row, adapter):
local_cols, remote_cols = self._local_remote_columns(self.parent_property)
- remote_attr = [
- self.mapper._columntoproperty[c].key
- for c in remote_cols]
-
q = context.attributes[('subquery', reduced_path)]
collections = dict(
local_cols = [adapter.columns[c] for c in local_cols]
if self.uselist:
- def execute(state, dict_, row):
- collection = collections.get(
- tuple([row[col] for col in local_cols]),
- ()
- )
- state.get_impl(self.key).\
- set_committed_value(state, dict_, collection)
+ return self._create_collection_loader(collections, local_cols)
else:
- def execute(state, dict_, row):
- collection = collections.get(
- tuple([row[col] for col in local_cols]),
- (None,)
- )
- if len(collection) > 1:
- util.warn(
- "Multiple rows returned with "
- "uselist=False for eagerly-loaded attribute '%s' "
- % self)
+ return self._create_scalar_loader(collections, local_cols)
+
+ def _create_collection_loader(self, collections, local_cols):
+ def load_collection_from_subq(state, dict_, row):
+ collection = collections.get(
+ tuple([row[col] for col in local_cols]),
+ ()
+ )
+ state.get_impl(self.key).\
+ set_committed_value(state, dict_, collection)
- scalar = collection[0]
- state.get_impl(self.key).\
- set_committed_value(state, dict_, scalar)
+ return load_collection_from_subq, None, None
- return execute, None, None
+ def _create_scalar_loader(self, collections, local_cols):
+ def load_scalar_from_subq(state, dict_, row):
+ collection = collections.get(
+ tuple([row[col] for col in local_cols]),
+ (None,)
+ )
+ if len(collection) > 1:
+ util.warn(
+ "Multiple rows returned with "
+ "uselist=False for eagerly-loaded attribute '%s' "
+ % self)
+
+ scalar = collection[0]
+ state.get_impl(self.key).\
+ set_committed_value(state, dict_, scalar)
+
+ return load_scalar_from_subq, None, None
log.class_logger(SubqueryLoader)
path = path + (self.key,)
reduced_path = reduced_path + (self.key,)
- # check for user-defined eager alias
if ("user_defined_eager_row_processor", reduced_path) in\
context.attributes:
- clauses = context.attributes[
- ("user_defined_eager_row_processor",
- reduced_path)]
-
- adapter = entity._get_entity_clauses(context.query, context)
- if adapter and clauses:
- context.attributes[
- ("user_defined_eager_row_processor",
- reduced_path)] = clauses = clauses.wrap(adapter)
- elif adapter:
- context.attributes[
- ("user_defined_eager_row_processor",
- reduced_path)] = clauses = adapter
-
- add_to_collection = context.primary_columns
-
+ clauses, adapter, add_to_collection = \
+ self._get_user_defined_adapter(
+ context, entity, reduced_path, adapter
+ )
else:
# check for join_depth or basic recursion,
# if the current path was not explicitly stated as
if self.mapper.base_mapper in reduced_path:
return
- clauses = mapperutil.ORMAdapter(
- mapperutil.AliasedClass(self.mapper),
- equivalents=self.mapper._equivalent_columns,
- adapt_required=True)
-
- if self.parent_property.direction != interfaces.MANYTOONE:
- context.multi_row_eager_loaders = True
-
- innerjoin = allow_innerjoin and context.attributes.get(
- ("eager_join_type", path),
- self.parent_property.innerjoin)
- if not innerjoin:
- # if this is an outer join, all eager joins from
- # here must also be outer joins
- allow_innerjoin = False
-
- context.create_eager_joins.append(
- (self._create_eager_join, context,
- entity, path, adapter,
- parentmapper, clauses, innerjoin)
- )
-
- add_to_collection = context.secondary_columns
- context.attributes[
- ("eager_row_processor", reduced_path)
- ] = clauses
+ clauses, adapter, add_to_collection, \
+ allow_innerjoin = self._generate_row_adapter(
+ context, entity, path, reduced_path, adapter,
+ column_collection, parentmapper, allow_innerjoin
+ )
path += (self.mapper,)
reduced_path += (self.mapper.base_mapper,)
column_collection=add_to_collection,
allow_innerjoin=allow_innerjoin)
+ def _get_user_defined_adapter(self, context, entity,
+ reduced_path, adapter):
+ clauses = context.attributes[
+ ("user_defined_eager_row_processor",
+ reduced_path)]
+
+ adapter = entity._get_entity_clauses(context.query, context)
+ if adapter and clauses:
+ context.attributes[
+ ("user_defined_eager_row_processor",
+ reduced_path)] = clauses = clauses.wrap(adapter)
+ elif adapter:
+ context.attributes[
+ ("user_defined_eager_row_processor",
+ reduced_path)] = clauses = adapter
+
+ add_to_collection = context.primary_columns
+ return clauses, adapter, add_to_collection
+
+ def _generate_row_adapter(self,
+ context, entity, path, reduced_path, adapter,
+ column_collection, parentmapper, allow_innerjoin
+ ):
+ clauses = mapperutil.ORMAdapter(
+ mapperutil.AliasedClass(self.mapper),
+ equivalents=self.mapper._equivalent_columns,
+ adapt_required=True)
+
+ if self.parent_property.direction != interfaces.MANYTOONE:
+ context.multi_row_eager_loaders = True
+
+ innerjoin = allow_innerjoin and context.attributes.get(
+ ("eager_join_type", path),
+ self.parent_property.innerjoin)
+ if not innerjoin:
+ # if this is an outer join, all eager joins from
+ # here must also be outer joins
+ allow_innerjoin = False
+
+ context.create_eager_joins.append(
+ (self._create_eager_join, context,
+ entity, path, adapter,
+ parentmapper, clauses, innerjoin)
+ )
+
+ add_to_collection = context.secondary_columns
+ context.attributes[
+ ("eager_row_processor", reduced_path)
+ ] = clauses
+ return clauses, adapter, add_to_collection, allow_innerjoin
+
def _create_eager_join(self, context, entity,
path, adapter, parentmapper,
clauses, innerjoin):
return False
try:
- identity_key = self.mapper.identity_key_from_row(row, decorator)
+ self.mapper.identity_key_from_row(row, decorator)
return decorator
- except KeyError, k:
+ except KeyError:
# no identity key - dont return a row
# processor, will cause a degrade to lazy
return False
our_reduced_path + (self.mapper.base_mapper,),
eager_adapter)
- def eager_exec(state, dict_, row):
- _instance(row, None)
-
if not self.uselist:
- def new_execute(state, dict_, row):
- # set a scalar object instance directly on the parent
- # object, bypassing InstrumentedAttribute event handlers.
- dict_[key] = _instance(row, None)
-
- def existing_execute(state, dict_, row):
- # call _instance on the row, even though the object has
- # been created, so that we further descend into properties
- existing = _instance(row, None)
- if existing is not None \
- and key in dict_ \
- and existing is not dict_[key]:
- util.warn(
- "Multiple rows returned with "
- "uselist=False for eagerly-loaded attribute '%s' "
- % self)
- return new_execute, existing_execute, None, eager_exec
+ return self._create_scalar_loader(context, key, _instance)
else:
- def new_execute(state, dict_, row):
- collection = attributes.init_state_collection(
- state, dict_, key)
- result_list = util.UniqueAppender(collection,
- 'append_without_event')
- context.attributes[(state, key)] = result_list
- _instance(row, result_list)
-
- def existing_execute(state, dict_, row):
- if (state, key) in context.attributes:
- result_list = context.attributes[(state, key)]
- else:
- # appender_key can be absent from context.attributes
- # with isnew=False when self-referential eager loading
- # is used; the same instance may be present in two
- # distinct sets of result columns
- collection = attributes.init_state_collection(state,
- dict_, key)
- result_list = util.UniqueAppender(
- collection,
- 'append_without_event')
- context.attributes[(state, key)] = result_list
- _instance(row, result_list)
- return new_execute, existing_execute, None, eager_exec
+ return self._create_collection_loader(context, key, _instance)
else:
return self.parent_property.\
_get_strategy(LazyLoader).\
reduced_path,
mapper, row, adapter)
+ def _create_collection_loader(self, context, key, _instance):
+ def load_collection_from_joined_new_row(state, dict_, row):
+ collection = attributes.init_state_collection(
+ state, dict_, key)
+ result_list = util.UniqueAppender(collection,
+ 'append_without_event')
+ context.attributes[(state, key)] = result_list
+ _instance(row, result_list)
+
+ def load_collection_from_joined_existing_row(state, dict_, row):
+ if (state, key) in context.attributes:
+ result_list = context.attributes[(state, key)]
+ else:
+ # appender_key can be absent from context.attributes
+ # with isnew=False when self-referential eager loading
+ # is used; the same instance may be present in two
+ # distinct sets of result columns
+ collection = attributes.init_state_collection(state,
+ dict_, key)
+ result_list = util.UniqueAppender(
+ collection,
+ 'append_without_event')
+ context.attributes[(state, key)] = result_list
+ _instance(row, result_list)
+
+ def load_collection_from_joined_exec(state, dict_, row):
+ _instance(row, None)
+
+ return load_collection_from_joined_new_row, \
+ load_collection_from_joined_existing_row, \
+ None, load_collection_from_joined_exec
+
+ def _create_scalar_loader(self, context, key, _instance):
+ def load_scalar_from_joined_new_row(state, dict_, row):
+ # set a scalar object instance directly on the parent
+ # object, bypassing InstrumentedAttribute event handlers.
+ dict_[key] = _instance(row, None)
+
+ def load_scalar_from_joined_existing_row(state, dict_, row):
+ # call _instance on the row, even though the object has
+ # been created, so that we further descend into properties
+ existing = _instance(row, None)
+ if existing is not None \
+ and key in dict_ \
+ and existing is not dict_[key]:
+ util.warn(
+ "Multiple rows returned with "
+ "uselist=False for eagerly-loaded attribute '%s' "
+ % self)
+
+ def load_scalar_from_joined_exec(state, dict_, row):
+ _instance(row, None)
+
+ return load_scalar_from_joined_new_row, \
+ load_scalar_from_joined_existing_row, \
+ None, load_scalar_from_joined_exec
+
EagerLoader = JoinedLoader
"""Deprecated, use JoinedLoader"""