- Fixed bug in session.merge() which prevented dict-like
collections from merging.
+ - For those who might use debug logging on
+ sqlalchemy.orm.strategies, most logging calls during row
+ loading have been removed. These were never very helpful
+ and cluttered up the code.
+
+ - Some internal streamlining of object loading grants a
+ small speedup for large results, estimates are around
+ 10-15%.
+
- sql
- Added math negation operator support, -x.
self.new_init = None
def _create_instance_state(self, instance):
- global state
- if state is None:
- from sqlalchemy.orm import state
if self.mutable_attributes:
return state.MutableAttrInstanceState(instance, self)
else:
def add(self, state):
if state.key in self:
if dict.__getitem__(self, state.key) is not state:
- raise AssertionError("A conflicting state is already present in the identity map for key %r" % (state.key, ))
+ raise AssertionError("A conflicting state is already "
+ "present in the identity map for key %r"
+ % (state.key, ))
else:
dict.__setitem__(self, state.key, state)
self._manage_incoming_state(state)
pass
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
- """Return a 2-tuple consiting of two row processing functions and an instance post-processing function.
+ """Return a 2-tuple consiting of two row processing functions and
+ an instance post-processing function.
Input arguments are the query.SelectionContext and the *first*
applicable row of a result set obtained within
Callables are of the following form::
- def new_execute(state, dict_, row, **flags):
+ def new_execute(state, dict_, row, isnew):
# process incoming instance state and given row. the instance is
# "new" and was just created upon receipt of this row.
- # flags is a dictionary containing at least the following
- # attributes:
- # isnew - indicates if the instance was newly created as a
- # result of reading this row
- # instancekey - identity key of the instance
+ "isnew" indicates if the instance was newly created as a
+ result of reading this row
- def existing_execute(state, dict_, row, **flags):
+ def existing_execute(state, dict_, row):
# process incoming instance state and given row. the instance is
# "existing" and was created based on a previous row.
self._inherits_equated_pairs = None
if allow_null_pks:
- util.warn_deprecated('the allow_null_pks option to Mapper() is deprecated. It is now on in all cases.')
+ util.warn_deprecated('the allow_null_pks option to Mapper() is '
+ 'deprecated. It is now on in all cases.')
if with_polymorphic == '*':
self.with_polymorphic = ('*', None)
if isinstance(self.local_table, expression._SelectBaseMixin):
raise sa_exc.InvalidRequestError(
- "When mapping against a select() construct, map against an alias() of the construct instead."
- "This because several databases don't allow a SELECT from a subquery that does not have an alias."
+ "When mapping against a select() construct, map against "
+ "an alias() of the construct instead."
+ "This because several databases don't allow a "
+ "SELECT from a subquery that does not have an alias."
)
- if self.with_polymorphic and isinstance(self.with_polymorphic[1], expression._SelectBaseMixin):
+ if self.with_polymorphic and \
+ isinstance(self.with_polymorphic[1], expression._SelectBaseMixin):
self.with_polymorphic = (self.with_polymorphic[0], self.with_polymorphic[1].alias())
# our 'polymorphic identity', a string name that when located in a result set row
finally:
_COMPILE_MUTEX.release()
- # configurational / mutating methods. not threadsafe
- # except for compile().
-
def _configure_inheritance(self):
"""Configure settings related to inherting and/or inherited mappers being present."""
# the order of mapper compilation
for mapper in list(_mapper_registry):
if getattr(mapper, '_compile_failed', False):
- raise sa_exc.InvalidRequestError("One or more mappers failed to compile. Exception was probably "
+ raise sa_exc.InvalidRequestError(
+ "One or more mappers failed to compile. "
+ "Exception was probably "
"suppressed within a hasattr() call. "
"Message was: %s" % mapper._compile_failed)
if not mapper.compiled:
self._configure_property(key, prop, init=self.compiled)
- # class formatting / logging.
-
- def _log(self, msg):
- if self._should_log_info:
- self.logger.info(
- "(" + self.class_.__name__ +
- "|" +
- (self.local_table is not None and
- self.local_table.description or
- str(self.local_table)) +
- (self.non_primary and "|non-primary" or "") + ") " +
- msg)
-
- def _log_debug(self, msg):
- if self._should_log_debug:
- self.logger.debug(
- "(" + self.class_.__name__ +
- "|" +
- (self.local_table is not None and
- self.local_table.description
- or str(self.local_table)) +
- (self.non_primary and "|non-primary" or "") + ") " +
- msg)
+ def _log(self, msg, *args):
+ self.logger.info(
+ "(" + self.class_.__name__ +
+ "|" +
+ (self.local_table is not None and
+ self.local_table.description or
+ str(self.local_table)) +
+ (self.non_primary and "|non-primary" or "") + ") " +
+ msg, *args)
+
+ def _log_debug(self, msg, *args):
+ self.logger.debug(
+ "(" + self.class_.__name__ +
+ "|" +
+ (self.local_table is not None and
+ self.local_table.description
+ or str(self.local_table)) +
+ (self.non_primary and "|non-primary" or "") + ") " +
+ msg, *args)
def __repr__(self):
return '<Mapper at 0x%x; %s>' % (
id(self), self.class_.__name__)
def __str__(self):
- if self.local_table is not None:
- tabledesc = self.local_table.description
- else:
- tabledesc = None
return "Mapper|%s|%s%s" % (
self.class_.__name__,
- tabledesc,
+ self.local_table is not None and self.local_table.description or None,
self.non_primary and "|non-primary" or ""
)
- # informational / status
-
def _is_orphan(self, state):
o = False
for mapper in self.iterate_to_root():
except StopIteration:
visitables.pop()
- # persistence
-
@util.memoized_property
def _sorted_tables(self):
table_to_mapper = {}
raise orm_exc.FlushError(
"New instance %s with identity key %s conflicts with persistent instance %s" %
(state_str(state), instance_key, state_str(existing)))
+
if self._should_log_debug:
self._log_debug(
"detected row switch for identity %s. will update %s, remove %s from "
- "transaction" % (instance_key, state_str(state), state_str(existing)))
+ "transaction", instance_key, state_str(state), state_str(existing))
# remove the "delete" flag from the existing element
uowtransaction.set_row_switch(existing)
pks = mapper._pks_by_table[table]
if self._should_log_debug:
- self._log_debug("_save_obj() table '%s' instance %s identity %s" %
- (table.name, state_str(state), str(instance_key)))
+ self._log_debug("_save_obj() table '%s' instance %s identity %s",
+ table.name, state_str(state), str(instance_key))
isinsert = not has_identity and not postupdate and state not in row_switches
for col in mapper._cols_by_table[table]:
if col is mapper.version_id_col:
params[col.key] = 1
- elif mapper.polymorphic_on is not None and mapper.polymorphic_on.shares_lineage(col):
- if self._should_log_debug:
- self._log_debug(
- "Using polymorphic identity '%s' for insert column '%s'" %
- (mapper.polymorphic_identity, col.key))
+ elif mapper.polymorphic_on is not None and \
+ mapper.polymorphic_on.shares_lineage(col):
value = mapper.polymorphic_identity
if ((col.default is None and
col.server_default is None) or
for dep in self._props.values() + self._dependency_processors:
dep.register_processors(uowcommit)
- # result set conversion
-
- def _instance_processor(self, context, path, adapter, polymorphic_from=None, extension=None, only_load_props=None, refresh_state=None, polymorphic_discriminator=None):
- """Produce a mapper level row processor callable which processes rows into mapped instances."""
+ def _instance_processor(self, context, path, adapter,
+ polymorphic_from=None, extension=None,
+ only_load_props=None, refresh_state=None,
+ polymorphic_discriminator=None):
+
+ """Produce a mapper level row processor callable
+ which processes rows into mapped instances."""
pk_cols = self.primary_key
polymorphic_on = polymorphic_discriminator
else:
polymorphic_on = self.polymorphic_on
- polymorphic_instances = util.PopulateDict(self._configure_subclass_mapper(context, path, adapter))
+ polymorphic_instances = util.PopulateDict(
+ self._configure_subclass_mapper(context, path, adapter)
+ )
version_id_col = self.version_id_col
new_populators = []
existing_populators = []
-
- def populate_state(state, dict_, row, isnew, only_load_props, **flags):
+ load_path = context.query._current_path + path
+
+ def populate_state(state, dict_, row, isnew, only_load_props):
if isnew:
if context.propagate_options:
state.load_options = context.propagate_options
if state.load_options:
- state.load_path = context.query._current_path + path
+ state.load_path = load_path
if not new_populators:
- new_populators[:], existing_populators[:] = self._populators(context, path, row, adapter)
+ new_populators[:], existing_populators[:] = \
+ self._populators(context, path, row, adapter)
if isnew:
populators = new_populators
populators = [p for p in populators if p[0] in only_load_props]
for key, populator in populators:
- populator(state, dict_, row, isnew=isnew, **flags)
+ populator(state, dict_, row, isnew)
session_identity_map = context.session.identity_map
state = attributes.instance_state(instance)
dict_ = attributes.instance_dict(instance)
- if self._should_log_debug:
- self._log_debug("_instance(): using existing instance %s identity %s" %
- (instance_str(instance), identitykey))
-
isnew = state.runid != context.runid
currentload = not isnew
loaded_instance = False
- if not currentload and version_id_col is not None and context.version_check and \
- self._get_state_attr_by_column(state, self.version_id_col) != row[version_id_col]:
+ if not currentload and \
+ version_id_col is not None and \
+ context.version_check and \
+ self._get_state_attr_by_column(
+ state,
+ self.version_id_col) != row[version_id_col]:
+
raise orm_exc.ConcurrentModificationError(
"Instance '%s' version of %s does not match %s"
- % (state_str(state), self._get_state_attr_by_column(state, self.version_id_col), row[version_id_col]))
+ % (state_str(state),
+ self._get_state_attr_by_column(state, self.version_id_col),
+ row[version_id_col]))
elif refresh_state:
# out of band refresh_state detected (i.e. its not in the session.identity_map)
# honor it anyway. this can happen if a _get() occurs within save_obj(), such as
currentload = True
loaded_instance = False
else:
- if self._should_log_debug:
- self._log_debug("_instance(): identity key %s not in session" % (identitykey,))
-
# check for non-NULL values in the primary key columns,
# else no entity is returned for the row
if _none_set.issuperset(identitykey[1]):
else:
instance = self.class_manager.new_instance()
- if self._should_log_debug:
- self._log_debug("_instance(): created new instance %s identity %s" %
- (instance_str(instance), identitykey))
-
dict_ = attributes.instance_dict(instance)
state = attributes.instance_state(instance)
state.key = identitykey
if not populate_instance or \
populate_instance(self, context, row, instance,
- only_load_props=only_load_props, instancekey=identitykey, isnew=isnew) is EXT_CONTINUE:
+ only_load_props=only_load_props,
+ instancekey=identitykey, isnew=isnew) is EXT_CONTINUE:
populate_state(state, dict_, row, isnew, only_load_props)
else:
else:
isnew = True
attrs = state.unloaded
- context.partials[state] = (dict_, attrs) #<-- allow query.instances to commit the subset of attrs
+ # allow query.instances to commit the subset of attrs
+ context.partials[state] = (dict_, attrs)
if not populate_instance or \
populate_instance(self, context, row, instance,
- only_load_props=attrs, instancekey=identitykey, isnew=isnew) is EXT_CONTINUE:
- populate_state(state, dict_, row, isnew, attrs, instancekey=identitykey)
+ only_load_props=attrs,
+ instancekey=identitykey, isnew=isnew) is EXT_CONTINUE:
+ populate_state(state, dict_, row, isnew, attrs)
if loaded_instance:
state._run_on_load(instance)
if result is not None and \
(not append_result or
- append_result(self, context, row, instance, result, instancekey=identitykey, isnew=isnew) is EXT_CONTINUE):
+ append_result(self, context, row, instance,
+ result, instancekey=identitykey, isnew=isnew)
+ is EXT_CONTINUE):
result.append(instance)
return instance
if mapper.inherits and not mapper.concrete:
statement = mapper._optimized_get_statement(state, attribute_names)
if statement is not None:
- result = session.query(mapper).from_statement(statement)._get(None, only_load_props=attribute_names, refresh_state=state)
+ result = session.query(mapper).from_statement(statement).\
+ _get(None,
+ only_load_props=attribute_names,
+ refresh_state=state)
if result is False:
if has_key:
identity_key = state.key
else:
identity_key = mapper._identity_key_from_state(state)
- result = session.query(mapper)._get(identity_key, refresh_state=state, only_load_props=attribute_names)
+ result = session.query(mapper)._get(
+ identity_key,
+ refresh_state=state,
+ only_load_props=attribute_names)
- # if instance is pending, a refresh operation may not complete (even if PK attributes are assigned)
+ # if instance is pending, a refresh operation
+ # may not complete (even if PK attributes are assigned)
if has_key and result is None:
raise orm_exc.ObjectDeletedError("Instance '%s' has been deleted." % state_str(state))
ATTR_WAS_SET
from sqlalchemy.orm import attributes, exc as orm_exc, interfaces
+import sys
+attributes.state = sys.modules['sqlalchemy.orm.state']
+
class InstanceState(object):
"""tracks state information at the instance level."""
insert_order = None
mutable_dict = None
_strong_obj = None
+ modified = False
+ expired = False
def __init__(self, obj, manager):
self.class_ = obj.__class__
self.manager = manager
self.obj = weakref.ref(obj, self._cleanup)
- self.modified = False
- self.callables = {}
- self.expired = False
- self.committed_state = {}
- self.pending = {}
- self.parents = {}
+
+ @util.memoized_property
+ def committed_state(self):
+ return {}
+
+ @util.memoized_property
+ def parents(self):
+ return {}
+
+ @util.memoized_property
+ def pending(self):
+ return {}
+
+ @util.memoized_property
+ def callables(self):
+ return {}
def detach(self):
if self.session_id:
def _run_on_load(self, instance):
self.manager.events.run('on_load', instance)
-
+
def __getstate__(self):
d = {
'instance':self.obj(),
(k, self.__dict__[k]) for k in (
'committed_state', 'pending', 'parents', 'modified', 'expired',
'callables'
- ) if self.__dict__[k]
+ ) if self.__dict__.get(k, False)
)
d.update(
def init_class_attribute(self, mapper):
self.is_class_level = True
coltype = self.columns[0].type
- active_history = self.columns[0].primary_key # TODO: check all columns ? check for foreign Key as well?
+ # TODO: check all columns ? check for foreign key as well?
+ active_history = self.columns[0].primary_key
_register_attribute(self, mapper, useobject=False,
compare_function=coltype.compare_values,
key, col = self.key, self.columns[0]
if adapter:
col = adapter.columns[col]
+
if col is not None and col in row:
- def new_execute(state, dict_, row, **flags):
+ def new_execute(state, dict_, row, isnew):
dict_[key] = row[col]
-
- if self._should_log_debug:
- new_execute = self.debug_callable(new_execute, self.logger,
- "%s returning active column fetcher" % self,
- lambda state, dict_, row, **flags: "%s populating %s" % \
- (self,
- mapperutil.state_attribute_str(state, key))
- )
- return (new_execute, None)
else:
- def new_execute(state, dict_, row, isnew, **flags):
+ def new_execute(state, dict_, row, isnew):
if isnew:
state.expire_attributes([key])
- if self._should_log_debug:
- self.logger.debug("%s deferring load", self)
- return (new_execute, None)
+ return new_execute, None
log.class_logger(ColumnLoader)
)
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
- key, columns, composite_class = self.key, self.columns, self.parent_property.composite_class
+ key = self.key
+ columns = self.columns
+ composite_class = self.parent_property.composite_class
if adapter:
columns = [adapter.columns[c] for c in columns]
+
for c in columns:
if c not in row:
- def new_execute(state, dict_, row, isnew, **flags):
+ def new_execute(state, dict_, row, isnew):
if isnew:
state.expire_attributes([key])
- if self._should_log_debug:
- self.logger.debug("%s deferring load", self)
- return (new_execute, None)
+ break
else:
- def new_execute(state, dict_, row, **flags):
+ def new_execute(state, dict_, row, isnew):
dict_[key] = composite_class(*[row[c] for c in columns])
- if self._should_log_debug:
- new_execute = self.debug_callable(new_execute, self.logger,
- "%s returning active composite column fetcher" % self,
- lambda state, dict_, row, **flags: "populating %s" % \
- (mapperutil.state_attribute_str(state, key))
- )
-
- return (new_execute, None)
+ return new_execute, None
log.class_logger(CompositeColumnLoader)
if adapter:
col = adapter.columns[col]
if col in row:
- return self.parent_property._get_strategy(ColumnLoader).create_row_processor(selectcontext, path, mapper, row, adapter)
+ return self.parent_property._get_strategy(ColumnLoader).\
+ create_row_processor(
+ selectcontext, path, mapper, row, adapter)
elif not self.is_class_level:
- def new_execute(state, dict_, row, **flags):
+ def new_execute(state, dict_, row, isnew):
state.set_callable(self.key, LoadDeferredColumns(state, self.key))
else:
- def new_execute(state, dict_, row, **flags):
+ def new_execute(state, dict_, row, isnew):
# reset state on the key so that deferred callables
# fire off on next access.
state.reset(self.key, dict_)
- if self._should_log_debug:
- new_execute = self.debug_callable(new_execute, self.logger, None,
- lambda state, dict_, row, **flags: "set deferred callable on %s" % \
- mapperutil.state_attribute_str(state, self.key)
- )
- return (new_execute, None)
+ return new_execute, None
def init(self):
if hasattr(self.parent_property, 'composite_class'):
if strategy._should_log_debug:
strategy.logger.debug(
- "deferred load %s group %s" %
- (mapperutil.state_attribute_str(state, self.key), group and ','.join(group) or 'None')
- )
+ "deferred load %s group %s",
+ (mapperutil.state_attribute_str(state, self.key),
+ group and ','.join(group) or 'None')
+ )
session = sessionlib._state_session(state)
if session is None:
)
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
- def new_execute(state, dict_, row, **flags):
+ def new_execute(state, dict_, row, isnew):
self._init_instance_attribute(state)
-
- if self._should_log_debug:
- new_execute = self.debug_callable(new_execute, self.logger, None,
- lambda state, dict_, row, **flags: "initializing blank scalar/collection on %s" % \
- mapperutil.state_attribute_str(state, self.key)
- )
- return (new_execute, None)
+ return new_execute, None
log.class_logger(NoLoader)
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
if not self.is_class_level:
- def new_execute(state, dict_, row, **flags):
+ def new_execute(state, dict_, row, isnew):
# we are not the primary manager for this attribute on this class - set up a
# per-instance lazyloader, which will override the class-level behavior.
# this currently only happens when using a "lazyload" option on a "no load"
# attribute - "eager" attributes always have a class-level lazyloader
# installed.
self._init_instance_attribute(state, callable_=LoadLazyAttribute(state, self.key))
-
- if self._should_log_debug:
- new_execute = self.debug_callable(new_execute, self.logger, None,
- lambda state, dict_, row, **flags: "set instance-level lazy loader on %s" % \
- mapperutil.state_attribute_str(state,
- self.key)
- )
-
- return (new_execute, None)
else:
- def new_execute(state, dict_, row, **flags):
+ def new_execute(state, dict_, row, isnew):
# we are the primary manager for this attribute on this class - reset its
# per-instance attribute state, so that the class-level lazy loader is
# executed when next referenced on this instance. this is needed in
# populate_existing() types of scenarios to reset any existing state.
state.reset(self.key, dict_)
- if self._should_log_debug:
- new_execute = self.debug_callable(new_execute, self.logger, None,
- lambda state, dict_, row, **flags: "set class-level lazy loader on %s" % \
- mapperutil.state_attribute_str(state,
- self.key)
- )
-
- return (new_execute, None)
+ return new_execute, None
def _create_lazy_clause(cls, prop, reverse_direction=False):
binds = util.column_dict()
if kw.get('passive') is attributes.PASSIVE_NO_FETCH and not strategy.use_get:
return attributes.PASSIVE_NO_RESULT
-
- if strategy._should_log_debug:
- strategy.logger.debug("loading %s" % mapperutil.state_attribute_str(state, self.key))
+ if strategy._should_log_debug:
+ strategy.logger.debug("loading %s",
+ mapperutil.state_attribute_str(state, self.key))
+
session = sessionlib._state_session(state)
if session is None:
raise orm_exc.DetachedInstanceError(
ident = []
allnulls = True
for primary_key in prop.mapper.primary_key:
- val = instance_mapper._get_committed_state_attr_by_column(state, strategy._equated_columns[primary_key], **kw)
+ val = instance_mapper._get_committed_state_attr_by_column(
+ state, strategy._equated_columns[primary_key], **kw)
if val is attributes.PASSIVE_NO_RESULT:
return val
allnulls = allnulls and val is None
def init_class_attribute(self, mapper):
self.parent_property._get_strategy(LazyLoader).init_class_attribute(mapper)
- def setup_query(self, context, entity, path, adapter, column_collection=None, parentmapper=None, **kwargs):
+ def setup_query(self, context, entity, path, adapter, \
+ column_collection=None, parentmapper=None, **kwargs):
"""Add a left outer join to the statement thats being constructed."""
if not context.query._enable_eagerloads:
elif ("eager_row_processor", reduced_path) in context.attributes:
decorator = context.attributes[("eager_row_processor", reduced_path)]
else:
- if self._should_log_debug:
- self.logger.debug("Could not locate aliased clauses for key: " + str(path))
return False
try:
identity_key = self.mapper.identity_key_from_row(row, decorator)
return decorator
except KeyError, k:
- # no identity key - dont return a row processor, will cause a degrade to lazy
- if self._should_log_debug:
- self.logger.debug("could not locate identity key from row; missing column '%s'" % k)
+ # no identity key - dont return a row
+ # processor, will cause a degrade to lazy
return False
def create_row_processor(self, context, path, mapper, row, adapter):
if eager_adapter is not False:
key = self.key
- _instance = self.mapper._instance_processor(context, path + (self.mapper,), eager_adapter)
+ _instance = self.mapper._instance_processor(
+ context,
+ path + (self.mapper,),
+ eager_adapter)
if not self.uselist:
- def execute(state, dict_, row, isnew, **flags):
+ def execute(state, dict_, row, isnew):
if isnew:
# set a scalar object instance directly on the
# parent object, bypassing InstrumentedAttribute
"Multiple rows returned with "
"uselist=False for eagerly-loaded attribute '%s' " % self)
else:
- def execute(state, dict_, row, isnew, **flags):
+ def execute(state, dict_, row, isnew):
if isnew or (state, key) not in context.attributes:
# appender_key can be absent from context.attributes with isnew=False
- # when self-referential eager loading is used; the same instance may be present
- # in two distinct sets of result columns
+ # when self-referential eager loading is used; the same instance
+ # may be present in two distinct sets of result columns
collection = attributes.init_state_collection(state, dict_, key)
appender = util.UniqueAppender(collection, 'append_without_event')
_instance(row, result_list)
- if self._should_log_debug:
- execute = self.debug_callable(execute, self.logger,
- "%s returning eager instance loader" % self,
- lambda state, dict_, row, isnew, **flags: "%s eagerload %s" % \
- (self,
- self.uselist and "scalar attribute"
- or "collection")
- )
-
- return (execute, execute)
+ return execute, execute
else:
- if self._should_log_debug:
- self.logger.debug("%s degrading to lazy loader" % self)
- return self.parent_property._get_strategy(LazyLoader).create_row_processor(context, path, mapper, row, adapter)
+ return self.parent_property._get_strategy(LazyLoader).\
+ create_row_processor(context, path, mapper, row, adapter)
log.class_logger(EagerLoader)
-import testenv; testenv.simple_setup()
import time, resource
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.test.util import gc_collect
-
+from sqlalchemy.test import profiling
db = create_engine('sqlite://')
metadata = MetaData(db)
person.age = row.age
people.append(person)
+#@profiling.profiled(report=True, always=True)
def orm_select():
session = create_session()
people = session.query(Person).all()