pass
- def execute(self, selectcontext, instance, row, identitykey, isnew):
- """Called when the mapper receives a row.
-
- `instance` is the parent instance corresponding to the `row`.
+ def create_row_processor(self, selectcontext, mapper, row):
+ """return a tuple of a row processing and a row post-processing function.
+
+ Input arguments are the query.SelectionContext and the *first*
+ row of a result set obtained within query.Query.instances().
+ By looking at the columns present within the row, MapperProperty
+ returns two callables which will be used to process the instance
+ that results from the row.
+
+ callables are of the following form:
+
+ def execute(instance, row, identitykey, isnew):
+ # process incoming instance, given row, identitykey,
+ # isnew flag indicating if this is the first row corresponding to this
+ # instance
+
+ def post_execute(instance):
+ # process instance after all result rows have been processed. this
+ # function should be used to issue additional selections in order to
+ # eagerly load additional properties.
+
+ return (execute, post_execute)
+
+ either tuple value can also be None in which case no function is called.
+
"""
-
- raise NotImplementedError()
-
- def post_execute(self, selectcontext, instance):
- """Called after all result rows have been received"""
-
raise NotImplementedError()
+
def cascade_iterator(self, type, object, recursive=None, halt_on=None):
return []
def setup(self, querycontext, **kwargs):
pass
- def execute(self, selectcontext, instance, row, identitykey, isnew):
- pass
+ def create_row_processor(self, selectcontext, mapper, row):
+ return (None, None)
def do_init(self):
if not self.proxy:
return strategy
def setup(self, querycontext, **kwargs):
-
self._get_context_strategy(querycontext).setup_query(querycontext, **kwargs)
- def execute(self, selectcontext, instance, row, identitykey, isnew):
- self._get_context_strategy(selectcontext).process_row(selectcontext, instance, row, identitykey, isnew)
+ def create_row_processor(self, selectcontext, mapper, row):
+ return self._get_context_strategy(selectcontext).create_row_processor(selectcontext, mapper, row)
def do_init(self):
self._all_strategies = {}
def setup_query(self, context, **kwargs):
pass
- def process_row(self, selectcontext, instance, row, identitykey, isnew):
- pass
+ def create_row_processor(self, selectcontext, mapper, row):
+ """return row processing functions which fulfill the contract specified
+ by MapperProperty.create_row_processor.
+
+
+ StrategizedProperty delegates its create_row_processor method
+ directly to this method.
+ """
+ raise NotImplementedError()
# multiple columns that all reference a common parent column. it will also resolve the column
# against the "mapped_table" of this mapper.
primary_key = sql.ColumnCollection()
+ # TODO: wrong ! this is a duplicate / slightly different approach to
+ # _get_inherited_column_equivalents(). pick one approach and stick with it !
equivs = {}
for col in (self.primary_key_argument or self.pks_by_table[self.mapped_table]):
if not len(col.foreign_keys):
if discriminator is not None:
mapper = self.polymorphic_map[discriminator]
if mapper is not self:
- if ('polymorphic_fetch', mapper, self.polymorphic_fetch) not in context.attributes:
- context.attributes[('polymorphic_fetch', mapper, self.polymorphic_fetch)] = (self, [t for t in mapper.tables if t not in self.tables])
+ if ('polymorphic_fetch', mapper) not in context.attributes:
+ context.attributes[('polymorphic_fetch', mapper)] = (self, [t for t in mapper.tables if t not in self.tables])
row = self.translate_row(mapper, row)
return mapper._instance(context, row, result=result, skip_polymorphic=True)
mapperutil.BinaryVisitor(visit_binary).traverse(cond)
return cond, param_names
- def _post_instance(self, context, instance):
- (hosted_mapper, needs_tables) = context.attributes.get(('polymorphic_fetch', self, 'select'), (None, None))
- if needs_tables is None or len(needs_tables) == 0:
- return
-
- # TODO: this logic needs to be merged with the same logic in DeferredColumnLoader
- self.__log_debug("Post query loading instance " + mapperutil.instance_str(instance))
- if ('post_select', self) not in context.attributes:
- cond, param_names = self._deferred_inheritance_condition(needs_tables)
- statement = sql.select(needs_tables, cond, use_labels=True)
- context.attributes[('post_select', self)] = (statement, param_names)
-
- (statement, binds) = context.attributes[('post_select', self)]
-
- identitykey = self.instance_key(instance)
-
- params = {}
- for c in binds:
- params[c.name] = self.get_attr_by_column(instance, c)
- row = context.session.connection(self).execute(statement, **params).fetchone()
- for prop in self.__props.values():
- if prop.parent is not hosted_mapper:
- prop.execute(context, instance, row, identitykey, True)
-
def translate_row(self, tomapper, row):
"""Translate the column keys of a row into a new or proxied
row that can be understood by another mapper.
return newrow
def populate_instance(self, selectcontext, instance, row, identitykey, isnew):
- """populate an instance from a result row.
+ """populate an instance from a result row."""
+
+ populators = selectcontext.attributes.get(('instance_populators', self), None)
+ if populators is None:
+ populators = []
+ post_processors = []
+ for prop in self.__props.values():
+ (pop, post_proc) = prop.create_row_processor(selectcontext, self, row)
+ if pop is not None:
+ populators.append(pop)
+ if post_proc is not None:
+ post_processors.append(post_proc)
+
+ poly_select_loader = self._get_poly_select_loader(selectcontext, row)
+ if poly_select_loader is not None:
+ post_processors.append(poly_select_loader)
+
+ selectcontext.attributes[('instance_populators', self)] = populators
+ selectcontext.attributes[('post_processors', self)] = post_processors
- This method iterates through the list of MapperProperty objects attached to this Mapper
- and calls each properties execute() method."""
+ for p in populators:
+ p(instance, row, identitykey, isnew)
+
+ if self.non_primary:
+ selectcontext.attributes[('populating_mapper', instance)] = self
- for prop in self.__props.values():
- prop.execute(selectcontext, instance, row, identitykey, isnew)
-
+ def _post_instance(self, selectcontext, instance):
+ post_processors = selectcontext.attributes[('post_processors', self)]
+ for p in post_processors:
+ p(instance)
+
+ def _get_poly_select_loader(self, selectcontext, row):
+ # 'select' or 'union'+col not present
+ (hosted_mapper, needs_tables) = selectcontext.attributes.get(('polymorphic_fetch', self), (None, None))
+ if hosted_mapper is None or len(needs_tables)==0 or hosted_mapper.polymorphic_fetch == 'deferred':
+ return
+
+ from strategies import ColumnLoader
+ from attributes import InstrumentedAttribute
+
+ cond, param_names = self._deferred_inheritance_condition(needs_tables)
+ statement = sql.select(needs_tables, cond, use_labels=True)
+ group = [p for p in self.props.values() if isinstance(p.strategy, ColumnLoader) and p.columns[0].table in needs_tables]
+
+ def post_execute(instance):
+ self.__log_debug("Post query loading instance " + mapperutil.instance_str(instance))
+
+ identitykey = self.instance_key(instance)
+
+ params = {}
+ for c in param_names:
+ params[c.name] = self.get_attr_by_column(instance, c)
+ row = selectcontext.session.connection(self).execute(statement, **params).fetchone()
+ for prop in group:
+ InstrumentedAttribute.get_instrument(instance, prop.key).set_committed_value(instance, row[prop.columns[0]])
+ return post_execute
+
Mapper.logger = logging.class_logger(Mapper)
coltype = self.columns[0].type
sessionlib.attribute_manager.register_attribute(self.parent.class_, self.key, uselist=False, copy_function=coltype.copy_value, compare_function=coltype.compare_values, mutable_scalars=self.columns[0].type.is_mutable())
- def process_row(self, selectcontext, instance, row, identitykey, isnew):
- if isnew:
+ def create_row_processor(self, selectcontext, mapper, row):
+ if self.columns[0] in row:
+ def execute(instance, row, identitykey, isnew):
+ if isnew:
+ if self._should_log_debug:
+ self.logger.debug("populating %s with %s/%s" % (mapperutil.attribute_str(instance, self.key), row.__class__.__name__, self.columns[0].key))
+ instance.__dict__[self.key] = row[self.columns[0]]
+ self.logger.debug("Returning active column fetcher for %s %s" % (mapper, self.key))
+ return (execute, None)
+ else:
+ (hosted_mapper, needs_tables) = selectcontext.attributes.get(('polymorphic_fetch', mapper), (None, None))
+ if hosted_mapper is None:
+ return (None, None)
+
+ if hosted_mapper.polymorphic_fetch == 'deferred':
+ def execute(instance, row, identitykey, isnew):
+ sessionlib.attribute_manager.init_instance_attribute(instance, self.key, False, callable_=self._get_deferred_loader(instance, mapper, needs_tables))
+ self.logger.debug("Returning deferred column fetcher for %s %s" % (mapper, self.key))
+ return (execute, None)
+ else:
+ self.logger.debug("Returning no column fetcher for %s %s" % (mapper, self.key))
+ return (None, None)
+
+ def _get_deferred_loader(self, instance, mapper, needs_tables):
+ def load():
+ group = [p for p in mapper.props.values() if isinstance(p.strategy, ColumnLoader) and p.columns[0].table in needs_tables]
+
if self._should_log_debug:
- self.logger.debug("populating %s with %s/%s" % (mapperutil.attribute_str(instance, self.key), row.__class__.__name__, self.columns[0].key))
+ self.logger.debug("deferred load %s group %s" % (mapperutil.attribute_str(instance, self.key), group and ','.join([p.key for p in group]) or 'None'))
+
+ session = sessionlib.object_session(instance)
+ if session is None:
+ raise exceptions.InvalidRequestError("Parent instance %s is not bound to a Session; deferred load operation of attribute '%s' cannot proceed" % (instance.__class__, self.key))
+
+ cond, param_names = mapper._deferred_inheritance_condition(needs_tables)
+ statement = sql.select(needs_tables, cond, use_labels=True)
+ params = {}
+ for c in param_names:
+ params[c.name] = mapper.get_attr_by_column(instance, c)
+
+ result = session.execute(mapper, statement, params)
try:
- instance.__dict__[self.key] = row[self.columns[0]]
- except KeyError:
- if self._should_log_debug:
- self.logger.debug("degrade to deferred column on %s" % mapperutil.attribute_str(instance, self.key))
- self.parent_property._get_strategy(DeferredColumnLoader).process_row(selectcontext, instance, row, identitykey, isnew)
-
+ row = result.fetchone()
+ for prop in group:
+ InstrumentedAttribute.get_instrument(instance, prop.key).set_committed_value(instance, row[prop.columns[0]])
+ return InstrumentedAttribute.ATTR_WAS_SET
+ finally:
+ result.close()
+
+ return load
+
ColumnLoader.logger = logging.class_logger(ColumnLoader)
class DeferredColumnLoader(LoaderStrategy):
This is per-column lazy loading.
"""
+ def create_row_processor(self, selectcontext, mapper, row):
+ if not self.is_default or len(selectcontext.options):
+ def execute(instance, row, identitykey, isnew):
+ if not isnew:
+ return
+ if self._should_log_debug:
+ self.logger.debug("set deferred callable on %s" % mapperutil.attribute_str(instance, self.key))
+ sessionlib.attribute_manager.init_instance_attribute(instance, self.key, False, callable_=self.setup_loader(instance))
+ return (execute, None)
+ else:
+ def execute(instance, row, identitykey, isnew):
+ if not isnew:
+ return
+ if self._should_log_debug:
+ self.logger.debug("set deferred callable on %s" % mapperutil.attribute_str(instance, self.key))
+ sessionlib.attribute_manager.reset_instance_attribute(instance, self.key)
+ return (execute, None)
+
def init(self):
super(DeferredColumnLoader, self).init()
self.columns = self.parent_property.columns
def setup_query(self, context, **kwargs):
pass
+
- def process_row(self, selectcontext, instance, row, identitykey, isnew):
- if isnew:
- if not self.is_default or len(selectcontext.options):
- sessionlib.attribute_manager.init_instance_attribute(instance, self.key, False, callable_=self.setup_loader(instance, selectcontext))
- else:
- sessionlib.attribute_manager.reset_instance_attribute(instance, self.key)
-
- def setup_loader(self, instance, context=None):
+ def _load_deferred_tables(self, context, instance, loadtype):
+ (hosted_mapper, needs_tables) = context.attributes[('polymorphic_fetch', localparent, loadtype)]
+ group = [p for p in localparent.props.values() if isinstance(p.strategy, ColumnLoader) and p.columns[0].table in needs_tables]
+
+ def setup_loader(self, instance):
localparent = mapper.object_mapper(instance, raiseerror=False)
if localparent is None:
return None
if prop is not self.parent_property:
return prop._get_strategy(DeferredColumnLoader).setup_loader(instance)
- if context is not None and ('polymorphic_fetch', localparent, 'deferred') in context.attributes:
- (hosted_mapper, needs_tables) = context.attributes[('polymorphic_fetch', localparent, 'deferred')]
- loadall = True
- else:
- loadall = False
-
- # clear context so it doesnt hang around attached to the instance
- context = None
-
def lazyload():
if not mapper.has_identity(instance):
return None
- if loadall:
- # TODO: this logic needs to be merged with the same logic in Mapper
- group = [p for p in localparent.props.values() if isinstance(p.strategy, ColumnLoader) and p.columns[0].table in needs_tables]
- elif self.group is not None:
+ if self.group is not None:
group = [p for p in localparent.props.values() if isinstance(p.strategy, DeferredColumnLoader) and p.group==self.group]
else:
group = None
if session is None:
raise exceptions.InvalidRequestError("Parent instance %s is not bound to a Session; deferred load operation of attribute '%s' cannot proceed" % (instance.__class__, self.key))
- if loadall:
- # TODO: this logic needs to be merged with the same logic in Mapper
- cond, param_names = localparent._deferred_inheritance_condition(needs_tables)
- statement = sql.select(needs_tables, cond, use_labels=True)
- params = {}
- for c in param_names:
- params[c.name] = localparent.get_attr_by_column(instance, c)
+ clause = localparent._get_clause
+ ident = instance._instance_key[1]
+ params = {}
+ for i, primary_key in enumerate(localparent.primary_key):
+ params[primary_key._label] = ident[i]
+ if group is not None:
+ statement = sql.select([p.columns[0] for p in group], clause, from_obj=[localparent.mapped_table], use_labels=True)
else:
- clause = localparent._get_clause
- ident = instance._instance_key[1]
- params = {}
- for i, primary_key in enumerate(localparent.primary_key):
- params[primary_key._label] = ident[i]
- if group is not None:
- statement = sql.select([p.columns[0] for p in group], clause, from_obj=[localparent.mapped_table], use_labels=True)
- else:
- statement = sql.select([self.columns[0]], clause, from_obj=[localparent.mapped_table], use_labels=True)
+ statement = sql.select([self.columns[0]], clause, from_obj=[localparent.mapped_table], use_labels=True)
if group is not None:
result = session.execute(localparent, statement, params)
def init_class_attribute(self):
self._register_attribute(self.parent.class_)
- def process_row(self, selectcontext, instance, row, identitykey, isnew):
- if isnew:
- if not self.is_default or len(selectcontext.options):
- if self._should_log_debug:
- self.logger.debug("set instance-level no loader on %s" % mapperutil.attribute_str(instance, self.key))
- self._init_instance_attribute(instance)
+ def create_row_processor(self, selectcontext, mapper, row):
+ if not self.is_default or len(selectcontext.options):
+ def execute(instance, row, identitykey, isnew):
+ if isnew:
+ if self._should_log_debug:
+ self.logger.debug("set instance-level no loader on %s" % mapperutil.attribute_str(instance, self.key))
+ self._init_instance_attribute(instance)
+ return (execute, None)
+ else:
+ return (None, None)
NoLoader.logger = logging.class_logger(NoLoader)
return None
return lazyload
- def process_row(self, selectcontext, instance, row, identitykey, isnew):
- if isnew:
- # new object instance being loaded from a result row
- if not self.is_default or len(selectcontext.options):
- self.logger.debug("set instance-level lazy loader on %s" % mapperutil.attribute_str(instance, self.key))
- # we are not the primary manager for this attribute on this class - set up a per-instance lazyloader,
- # which will override the clareset_instance_attributess-level behavior
- self._init_instance_attribute(instance, callable_=self.setup_loader(instance, selectcontext.options))
- else:
- self.logger.debug("set class-level lazy loader on %s" % mapperutil.attribute_str(instance, self.key))
- # we are the primary manager for this attribute on this class - reset its per-instance attribute state,
- # so that the class-level lazy loader is executed when next referenced on this instance.
- # this usually is not needed unless the constructor of the object referenced the attribute before we got
- # to load data into it.
- sessionlib.attribute_manager.reset_instance_attribute(instance, self.key)
+
+ def create_row_processor(self, selectcontext, mapper, row):
+ if not self.is_default or len(selectcontext.options):
+ def execute(instance, row, identitykey, isnew):
+ if isnew:
+ if self._should_log_debug:
+ self.logger.debug("set instance-level lazy loader on %s" % mapperutil.attribute_str(instance, self.key))
+ # we are not the primary manager for this attribute on this class - set up a per-instance lazyloader,
+ # which will override the clareset_instance_attributess-level behavior
+ self._init_instance_attribute(instance, callable_=self.setup_loader(instance, selectcontext.options))
+ return (execute, None)
+ else:
+ def execute(instance, row, identitykey, isnew):
+ if isnew:
+ if self._should_log_debug:
+ self.logger.debug("set class-level lazy loader on %s" % mapperutil.attribute_str(instance, self.key))
+ # we are the primary manager for this attribute on this class - reset its per-instance attribute state,
+ # so that the class-level lazy loader is executed when next referenced on this instance.
+ # this usually is not needed unless the constructor of the object referenced the attribute before we got
+ # to load data into it.
+ sessionlib.attribute_manager.reset_instance_attribute(instance, self.key)
+ return (execute, None)
def _create_lazy_clause(cls, prop, reverse_direction=False):
(primaryjoin, secondaryjoin, remote_side) = (prop.polymorphic_primaryjoin, prop.polymorphic_secondaryjoin, prop.remote_side)
class EagerRowAdapter(object):
def __init__(self, row):
self.row = row
+ def __contains__(self, key):
+ return self.has_key(key)
def has_key(self, key):
return map.has_key(key) or self.row.has_key(key)
def __getitem__(self, key):
for value in self.select_mapper.props.values():
value.setup(context, eagertable=clauses.eagertarget, parentclauses=clauses, parentmapper=self.select_mapper)
- def _create_row_processor(self, selectcontext, row):
- """Create a *row processing* function that will apply eager
+ def _create_row_decorator(self, selectcontext, row):
+ """Create a *row decorating* function that will apply eager
aliasing to the row.
Also check that an identity key can be retrieved from the row,
self.logger.debug("could not locate identity key from row '%s'; missing column '%s'" % (repr(decorated_row), str(k)))
return None
- def process_row(self, selectcontext, instance, row, identitykey, isnew):
- """Receive a row.
- Tell our mapper to look for a new object instance in the row,
- and attach it to a list on the parent instance.
- """
-
- if self in selectcontext.recursion_stack:
- return
-
- try:
- # check for row processor
- row_processor = selectcontext.attributes[id(self)]
- except KeyError:
- # create a row processor function and cache it in the context
- row_processor = self._create_row_processor(selectcontext, row)
- selectcontext.attributes[id(self)] = row_processor
-
- if row_processor is not None:
- decorated_row = row_processor(row)
+ def create_row_processor(self, selectcontext, mapper, row):
+ row_decorator = self._create_row_decorator(selectcontext, row)
+ if row_decorator is not None:
+ def execute(instance, row, identitykey, isnew):
+ if self in selectcontext.recursion_stack:
+ return
+ decorated_row = row_decorator(row)
+
+ # TODO: recursion check a speed hit...? try to get a "termination point" into the AliasedClauses
+ # or EagerRowAdapter ?
+ selectcontext.recursion_stack.add(self)
+ try:
+ if not self.uselist:
+ if self._should_log_debug:
+ self.logger.debug("eagerload scalar instance on %s" % mapperutil.attribute_str(instance, self.key))
+ if isnew:
+ # set a scalar object instance directly on the parent object,
+ # bypassing InstrumentedAttribute event handlers.
+ instance.__dict__[self.key] = self.mapper._instance(selectcontext, decorated_row, None)
+ else:
+ # call _instance on the row, even though the object has been created,
+ # so that we further descend into properties
+ self.mapper._instance(selectcontext, decorated_row, None)
+ else:
+ if isnew:
+ if self._should_log_debug:
+ self.logger.debug("initialize UniqueAppender on %s" % mapperutil.attribute_str(instance, self.key))
+
+ # call the InstrumentedAttribute's initialize() method to create a new, blank list
+ l = InstrumentedAttribute.get_instrument(instance, self.key).initialize(instance)
+
+ # create an appender object which will add set-like semantics to the list
+ appender = util.UniqueAppender(l.data)
+
+ # store it in the "scratch" area, which is local to this load operation.
+ selectcontext.attributes[(instance, self.key)] = appender
+ result_list = selectcontext.attributes[(instance, self.key)]
+ if self._should_log_debug:
+ self.logger.debug("eagerload list instance on %s" % mapperutil.attribute_str(instance, self.key))
+ self.mapper._instance(selectcontext, decorated_row, result_list)
+ finally:
+ selectcontext.recursion_stack.remove(self)
+ return (execute, None)
else:
- # row_processor was None: degrade to a lazy loader
- if self._should_log_debug:
- self.logger.debug("degrade to lazy loader on %s" % mapperutil.attribute_str(instance, self.key))
- self.parent_property._get_strategy(LazyLoader).process_row(selectcontext, instance, row, identitykey, isnew)
- return
+ self.logger.debug("eager loader %s degrading to lazy loader" % str(self))
+ return self.parent_property._get_strategy(LazyLoader).create_row_processor(selectcontext, mapper, row)
- # TODO: recursion check a speed hit...? try to get a "termination point" into the AliasedClauses
- # or EagerRowAdapter ?
- selectcontext.recursion_stack.add(self)
- try:
- if not self.uselist:
- if self._should_log_debug:
- self.logger.debug("eagerload scalar instance on %s" % mapperutil.attribute_str(instance, self.key))
- if isnew:
- # set a scalar object instance directly on the parent object,
- # bypassing InstrumentedAttribute event handlers.
- instance.__dict__[self.key] = self.mapper._instance(selectcontext, decorated_row, None)
- else:
- # call _instance on the row, even though the object has been created,
- # so that we further descend into properties
- self.mapper._instance(selectcontext, decorated_row, None)
- else:
- if isnew:
- if self._should_log_debug:
- self.logger.debug("initialize UniqueAppender on %s" % mapperutil.attribute_str(instance, self.key))
-
- # call the InstrumentedAttribute's initialize() method to create a new, blank list
- l = InstrumentedAttribute.get_instrument(instance, self.key).initialize(instance)
-
- # create an appender object which will add set-like semantics to the list
- appender = util.UniqueAppender(l.data)
-
- # store it in the "scratch" area, which is local to this load operation.
- selectcontext.attributes[(instance, self.key)] = appender
- result_list = selectcontext.attributes[(instance, self.key)]
- if self._should_log_debug:
- self.logger.debug("eagerload list instance on %s" % mapperutil.attribute_str(instance, self.key))
- self.mapper._instance(selectcontext, decorated_row, result_list)
- finally:
- selectcontext.recursion_stack.remove(self)
-
EagerLoader.logger = logging.class_logger(EagerLoader)
class EagerLazyOption(StrategizedOption):