return self._modified
def check_modified(self):
- """return True if any InstanceStates present have been marked as 'modified'."""
+ """return True if any InstanceStates present have been marked
+ as 'modified'.
+ """
return bool(self._modified)
def has_key(self, key):
def __delitem__(self, key):
raise NotImplementedError("IdentityMap uses remove() to remove data")
+
class WeakInstanceDict(IdentityMap):
def __init__(self):
IdentityMap.__init__(self)
if existing_state is not state:
o = existing_state.obj()
if o is not None:
- raise AssertionError("A conflicting state is already "
- "present in the identity map for key %r"
- % (key, ))
+ raise AssertionError(
+ "A conflicting state is already "
+ "present in the identity map for key %r"
+ % (key, ))
else:
return
except KeyError:
# return iter(self._values())
# Py2K
items = _items
+
def iteritems(self):
return iter(self.items())
values = _values
+
def itervalues(self):
return iter(self.values())
# end Py2K
def prune(self):
return 0
+
class StrongInstanceDict(IdentityMap):
def all_states(self):
return [attributes.instance_state(o) for o in self.itervalues()]
def contains_state(self, state):
- return state.key in self and attributes.instance_state(self[state.key]) is state
+ return (
+ state.key in self and
+ attributes.instance_state(self[state.key]) is state)
def replace(self, state):
if dict.__contains__(self, state.key):
dict.update(self, keepers)
self.modified = bool(dirty)
return ref_count - len(self)
-
from . import exc, collections, events
from operator import attrgetter
from .. import event, util
-import weakref
state = util.importlater("sqlalchemy.orm", "state")
+
class ClassManager(dict):
"""tracks state information at the class level."""
return '<%s of %r at %x>' % (
self.__class__.__name__, self.class_, id(self))
+
class InstrumentationFactory(object):
"""Factory for new ClassManager instances."""
# when importred.
_instrumentation_factory = InstrumentationFactory()
+
def register_class(class_):
"""Register class instrumentation.
manager = _instrumentation_factory.create_manager_for_cls(class_)
return manager
+
def unregister_class(class_):
"""Unregister class instrumentation."""
manager_of_class = _default_manager_getter = ClassManager.manager_getter()
+
def _generate_init(class_, class_manager):
"""Build an __init__ decorator that triggers ClassManager events."""
#if func_kw_defaults:
# __init__.__kwdefaults__ = func_kw_defaults
return __init__
-
"""
from __future__ import absolute_import
-from itertools import chain
from .. import exc as sa_exc, util, inspect
from ..sql import operators
SessionExtension, \
MapperExtension
+
class _InspectionAttr(object):
"""Define a series of attributes that all ORM inspection
targets need to have."""
is_attribute = False
is_clause_element = False
+
class _MappedAttribute(object):
"""Mixin for attributes which should be replaced by mapper-assigned
attributes.
"""
+
+
class MapperProperty(_MappedAttribute, _InspectionAttr):
"""Manage the relationship of a ``Mapper`` to a single class
attribute, as well as that attribute as it appears on individual
mapped :class:`.Column`, which is represented in a mapping as
an instance of :class:`.ColumnProperty`,
and a reference to another class produced by :func:`.relationship`,
- represented in the mapping as an instance of :class:`.RelationshipProperty`.
+ represented in the mapping as an instance of
+ :class:`.RelationshipProperty`.
"""
"""
pass
-
def is_primary(self):
"""Return True if this ``MapperProperty``'s mapper is the
primary mapper for its class.
return operator(self.comparator, value)
+
class PropComparator(operators.ColumnOperators):
"""Defines boolean, comparison, and other operators for
:class:`.MapperProperty` objects.
SQLAlchemy allows for operators to
be redefined at both the Core and ORM level. :class:`.PropComparator`
is the base class of operator redefinition for ORM-level operations,
- including those of :class:`.ColumnProperty`, :class:`.RelationshipProperty`,
- and :class:`.CompositeProperty`.
+ including those of :class:`.ColumnProperty`,
+ :class:`.RelationshipProperty`, and :class:`.CompositeProperty`.
.. note:: With the advent of Hybrid properties introduced in SQLAlchemy
0.7, as well as Core-level operator redefinition in
class SomeMappedClass(Base):
some_column = column_property(Column("some_column", String),
- comparator_factory=MyColumnComparator)
+ comparator_factory=MyColumnComparator)
some_relationship = relationship(SomeOtherClass,
- comparator_factory=MyRelationshipComparator)
+ comparator_factory=MyRelationshipComparator)
some_composite = composite(
Column("a", String), Column("b", String),
self._parentmapper = parentmapper
self.adapter = adapter
-
def __clause_element__(self):
raise NotImplementedError("%r" % self)
query.join(Company.employees.of_type(Engineer)).\\
filter(Engineer.name=='foo')
- :param \class_: a class or mapper indicating that criterion will be against
- this specific subclass.
+ :param \class_: a class or mapper indicating that criterion will be
+ against this specific subclass.
"""
:param criterion: an optional ClauseElement formulated against the
member class' table or attributes.
- :param \**kwargs: key/value pairs corresponding to member class attribute
- names which will be compared via equality to the corresponding
- values.
+ :param \**kwargs: key/value pairs corresponding to member class
+ attribute names which will be compared via equality to the
+ corresponding values.
"""
:param criterion: an optional ClauseElement formulated against the
member class' table or attributes.
- :param \**kwargs: key/value pairs corresponding to member class attribute
- names which will be compared via equality to the corresponding
- values.
+ :param \**kwargs: key/value pairs corresponding to member class
+ attribute names which will be compared via equality to the
+ corresponding values.
"""
not mapper.class_manager._attr_has_impl(self.key):
self.strategy.init_class_attribute(mapper)
+
class MapperOption(object):
"""Describe a modification to a Query."""
self.process_query(query)
+
class PropertyOption(MapperOption):
"""A MapperOption that is applied to a property off the mapper or
one of its child mappers, identified by a dot-separated key
return paths
+
class StrategizedOption(PropertyOption):
"""A MapperOption that affects which LoaderStrategy will be used
for an operation by a StrategizedProperty.
def get_strategy_class(self):
raise NotImplementedError()
+
class LoaderStrategy(object):
"""Describe the loading behavior of a StrategizedProperty object.
def __str__(self):
return str(self.parent_property)
-
-
_new_runid = util.counter()
+
def instances(query, cursor, context):
"""Return an ORM result as an iterator."""
session = query.session
if not query._yield_per:
break
+
def merge_result(query, iterator, load=True):
"""Merge a result into this :class:`.Query` object's Session."""
finally:
session.autoflush = autoflush
+
def get_from_identity(session, key, passive):
"""Look up the given key in the given session's identity map,
check the object for expired state if found.
else:
return None
+
def load_on_ident(query, key,
refresh_state=None, lockmode=None,
only_load_props=None):
except orm_exc.NoResultFound:
return None
+
def instance_processor(mapper, context, path, adapter,
polymorphic_from=None,
only_load_props=None,
if isnew:
state.manager.dispatch.refresh(state, context, attrs)
-
if result is not None:
if append_result:
for fn in append_result:
return instance
return _instance
+
def _populators(mapper, context, path, row, adapter,
new_populators, existing_populators, eager_populators):
"""Produce a collection of attribute level row processor
if delayed_populators:
new_populators.extend(delayed_populators)
+
def _configure_subclass_mapper(mapper, context, path, adapter):
"""Produce a mapper level row processor callable factory for mappers
inheriting this one."""
polymorphic_from=mapper)
return configure_subclass_mapper
+
def load_scalar_attributes(mapper, state, attribute_names):
"""initiate a column-based attribute refresh operation."""
# may not complete (even if PK attributes are assigned)
if has_key and result is None:
raise orm_exc.ObjectDeletedError(state)
-
# lock used to synchronize the "mapper configure" step
_CONFIGURE_MUTEX = util.threading.RLock()
+
class Mapper(_InspectionAttr):
"""Define the correlation of class attributes to database table
columns.
column=None):
self._adapt_inherited_property(key, prop, False)
-
def _set_polymorphic_on(self, polymorphic_on):
self.polymorphic_on = polymorphic_on
self._configure_polymorphic_setter(True)
configure_mappers()
return self
-
@property
@util.deprecated("0.7", message=":attr:`.Mapper.compiled` "
"is replaced by :attr:`.Mapper.configured`")
else:
self._set_polymorphic_identity = None
-
-
def _adapt_inherited_property(self, key, prop, init):
if not self.concrete:
self._configure_property(key, prop, init=False, setparent=False)
return [self]
return self._mappers_from_spec(*self.with_polymorphic)
-
@_memoized_configured_property
def _with_polymorphic_selectable(self):
if not self.with_polymorphic:
return state.manager[prop.key].impl.\
get_committed_value(state, dict_, passive=passive)
-
def _optimized_get_statement(self, state, attribute_names):
"""assemble a WHERE clause which retrieves a given state by primary
key, using a minimized set of tables.
inspection._self_inspects(Mapper)
log.class_logger(Mapper)
+
def configure_mappers():
"""Initialize the inter-mapper relationships of all mappers that
have been constructed thus far.
if _call_configured is not None:
_call_configured.dispatch.after_configured()
+
def reconstructor(fn):
"""Decorate a method as the 'reconstructor' hook.
fn.__sa_reconstructor__ = True
return fn
+
def validates(*names, **kw):
"""Decorate a method as a 'validator' for one or more named properties.
return fn
return wrap
+
def _event_on_load(state, ctx):
instrumenting_mapper = state.manager.info[_INSTRUMENTOR]
if instrumenting_mapper._reconstructor:
instrumenting_mapper._reconstructor(state.obj())
+
def _event_on_first_init(manager, cls):
"""Initial mapper compilation trigger.
if _new_mappers:
configure_mappers()
+
def _event_on_init(state, args, kwargs):
"""Run init_instance hooks.
if instrumenting_mapper._set_polymorphic_identity:
instrumenting_mapper._set_polymorphic_identity(state)
+
def _event_on_resurrect(state):
# re-populate the primary key elements
# of the dict based on the mapping.
from .util import _state_mapper, state_str, _attr_as_key
from ..sql import expression
+
def save_obj(base_mapper, states, uowtransaction, single=False):
"""Issue ``INSERT`` and/or ``UPDATE`` statements for a list
of objects.
_finalize_insert_update_commands(base_mapper, uowtransaction,
states_to_insert, states_to_update)
+
def post_update(base_mapper, states, uowtransaction, post_update_cols):
"""Issue UPDATE statements on behalf of a relationship() which
specifies post_update.
base_mapper,
states, uowtransaction)
-
for table, mapper in base_mapper._sorted_tables.iteritems():
update = _collect_post_update_commands(base_mapper, uowtransaction,
table, states_to_update,
cached_connections,
mapper, table, update)
+
def delete_obj(base_mapper, states, uowtransaction):
"""Issue ``DELETE`` statements for a list of objects.
in states_to_delete:
mapper.dispatch.after_delete(mapper, connection, state)
+
def _organize_states_for_save(base_mapper, states, uowtransaction):
"""Make an initial pass across a set of states for INSERT or
UPDATE.
return states_to_insert, states_to_update
+
def _organize_states_for_post_update(base_mapper, states,
uowtransaction):
"""Make an initial pass across a set of states for UPDATE
return list(_connections_for_states(base_mapper, uowtransaction,
states))
+
def _organize_states_for_delete(base_mapper, states, uowtransaction):
"""Make an initial pass across a set of states for DELETE.
bool(state.key), connection))
return states_to_delete
+
def _collect_insert_commands(base_mapper, uowtransaction, table,
states_to_insert):
"""Identify sets of values to use in INSERT statements for a
connection, value_params, has_all_pks))
return insert
+
def _collect_update_commands(base_mapper, uowtransaction,
table, states_to_update):
"""Identify sets of values to use in UPDATE statements for a
connection))
return update
+
def _collect_delete_commands(base_mapper, uowtransaction, table,
states_to_delete):
"""Identify values to use in DELETE statements for a list of
c.dialect.dialect_description,
stacklevel=12)
+
def _emit_insert_statements(base_mapper, uowtransaction,
cached_connections, table, insert):
"""Emit INSERT statements corresponding to value lists collected
value_params)
-
def _emit_post_update_statements(base_mapper, uowtransaction,
cached_connections, mapper, table, update):
"""Emit UPDATE statements corresponding to value lists collected
else:
mapper.dispatch.after_update(mapper, connection, state)
+
def _postfetch(mapper, uowtransaction, table,
state, dict_, prefetch_cols, postfetch_cols,
params, value_params):
uowtransaction,
mapper.passive_updates)
+
def _connections_for_states(base_mapper, uowtransaction, states):
"""Return an iterator of (state, state.dict, mapper, connection).
yield state, state.dict, mapper, connection
+
def _cached_connection_dict(base_mapper):
# dictionary of connection->connection_with_cache_options.
return util.PopulateDict(
compiled_cache=base_mapper._compiled_cache
))
+
def _sort_states(states):
pending = set(states)
persistent = set(s for s in pending if s.key is not None)
return sorted(pending, key=operator.attrgetter("insert_order")) + \
sorted(persistent, key=lambda q: q.key[1])
+
class BulkUD(object):
"""Handle bulk update and deletes via a :class:`.Query`."""
def _do_post_synchronize(self):
pass
+
class BulkEvaluate(BulkUD):
"""BulkUD which does the 'evaluate' method of session state resolution."""
if issubclass(cls, target_cls) and
eval_condition(obj)]
+
class BulkFetch(BulkUD):
"""BulkUD which does the 'fetch' method of session state resolution."""
select_stmt,
params=query._params).fetchall()
+
class BulkUpdate(BulkUD):
"""BulkUD which handles UPDATEs."""
session.dispatch.after_bulk_update(session, self.query,
self.context, self.result)
+
class BulkDelete(BulkUD):
"""BulkUD which handles DELETEs."""
session.dispatch.after_bulk_delete(session, self.query,
self.context, self.result)
+
class BulkUpdateEvaluate(BulkEvaluate, BulkUpdate):
"""BulkUD which handles UPDATEs using the "evaluate"
method of session resolution."""
states.add(state)
session._register_altered(states)
+
class BulkDeleteEvaluate(BulkEvaluate, BulkDelete):
"""BulkUD which handles DELETEs using the "evaluate"
method of session resolution."""
[attributes.instance_state(obj)
for obj in self.matched_objects])
+
class BulkUpdateFetch(BulkFetch, BulkUpdate):
"""BulkUD which handles UPDATEs using the "fetch"
method of session resolution."""
session._expire_state(state, attrib)
session._register_altered(states)
+
class BulkDeleteFetch(BulkFetch, BulkDelete):
"""BulkUD which handles DELETEs using the "fetch"
method of session resolution."""
session.identity_map[identity_key]
)]
)
-
def __init__(self, *columns, **kwargs):
"""Construct a ColumnProperty.
- Note the public constructor is the :func:`.orm.column_property` function.
+ Note the public constructor is the :func:`.orm.column_property`
+ function.
:param \*columns: The list of `columns` describes a single
object property. If there are multiple tables joined
else:
self.strategy_class = strategies.ColumnLoader
-
@property
def expression(self):
"""Return the primary column or expression for this ColumnProperty.
"""Produce boolean, comparison, and other operators for
:class:`.ColumnProperty` attributes.
- See the documentation for :class:`.PropComparator` for a brief overview.
+ See the documentation for :class:`.PropComparator` for a brief
+ overview.
See also:
return self.adapter(self.prop.columns[0])
else:
return self.prop.columns[0]._annotate({
- "parententity": self._parentmapper,
- "parentmapper": self._parentmapper})
+ "parententity": self._parentmapper,
+ "parentmapper": self._parentmapper})
def __getattr__(self, key):
"""proxy attribute access down to the mapped column.
log.class_logger(ColumnProperty)
+
class RelationshipProperty(StrategizedProperty):
"""Describes an object property that holds a single item or list
of items that correspond to a related database table.
# should not correlate or otherwise reach out
# to anything in the enclosing query.
if criterion is not None:
- criterion = criterion._annotate({'no_replacement_traverse': True})
+ criterion = criterion._annotate(
+ {'no_replacement_traverse': True})
crit = j & criterion
will produce::
SELECT * FROM my_table WHERE
- NOT EXISTS (SELECT 1 FROM related WHERE related.my_id=my_table.id)
+ NOT EXISTS (SELECT 1 FROM related WHERE
+ related.my_id=my_table.id)
:meth:`~.RelationshipProperty.Comparator.any` is only
valid for collections, i.e. a :func:`.relationship`
Will produce a query like::
SELECT * FROM my_table WHERE
- EXISTS (SELECT 1 FROM related WHERE related.id==my_table.related_id
- AND related.x=2)
+ EXISTS (SELECT 1 FROM related WHERE
+ related.id==my_table.related_id AND related.x=2)
Because :meth:`~.RelationshipProperty.Comparator.has` uses
a correlated subquery, its performance is not nearly as
state = attributes.instance_state(other)
def state_bindparam(x, state, col):
- o = state.obj() # strong ref
- return sql.bindparam(x, unique=True, callable_=lambda : \
- self.property.mapper._get_committed_attr_by_column(o,
- col))
+ o = state.obj() # strong ref
+ return sql.bindparam(x, unique=True, callable_=lambda: \
+ self.property.mapper._get_committed_attr_by_column(o, col))
def adapt(col):
if self.adapter:
adapt(x) == None)
for (x, y) in self.property.local_remote_pairs])
- criterion = sql.and_(*[x==y for (x, y) in
+ criterion = sql.and_(*[x == y for (x, y) in
zip(
self.property.mapper.primary_key,
self.property.\
if (source_state, r) in _recursive:
return
-
if not "merge" in self.cascade:
return
else:
return [(attributes.instance_state(x), x)]
-
- def cascade_iterator(self, type_, state, dict_, visited_states, halt_on=None):
+ def cascade_iterator(self, type_, state, dict_,
+ visited_states, halt_on=None):
#assert type_ in self.cascade
# only actively lazy load on the 'delete' cascade
yield c, instance_mapper, instance_state, instance_dict
-
def _add_reverse_property(self, key):
other = self.mapper.get_property(key, _configure_mappers=False)
self._reverse_property.add(other)
"cause dependency issues during flush"
% (self.key, self.parent, inheriting))
-
def _check_cascade_settings(self):
if self.cascade.delete_orphan and not self.single_parent \
and (self.direction is MANYTOMANY or self.direction
PropertyLoader = RelationProperty = RelationshipProperty
log.class_logger(RelationshipProperty)
-
from .util import (
AliasedClass, ORMAdapter, _entity_descriptor, PathRegistry,
_is_aliased_class, _is_mapped_class, _orm_columns,
- join as orm_join,with_parent, aliased
+ join as orm_join, with_parent, aliased
)
from .. import sql, util, log, exc as sa_exc, inspect, inspection, \
types as sqltypes
_path_registry = PathRegistry.root
+
class Query(object):
"""ORM-level SQL construction object.
self._from_obj_alias = sql_util.ColumnAdapter(
self._from_obj[0], equivs)
-
def _reset_polymorphic_adapter(self, mapper):
for m2 in mapper._with_polymorphic_mappers:
self._polymorphic_adapters.pop(m2, None)
return self._select_from_entity or \
self._entity_zero().entity_zero
-
@property
def _mapper_entities(self):
# TODO: this is wrong, its hardcoded to "primary entity" when
)
return self._entity_zero()
-
def __all_equivs(self):
equivs = {}
for ent in self._mapper_entities:
return self.enable_eagerloads(False).statement.label(name)
-
def as_scalar(self):
- """Return the full SELECT statement represented by this :class:`.Query`, converted
- to a scalar subquery.
+ """Return the full SELECT statement represented by this
+ :class:`.Query`, converted to a scalar subquery.
Analogous to :meth:`sqlalchemy.sql.SelectBaseMixin.as_scalar`.
@property
def whereclause(self):
- """A readonly attribute which returns the current WHERE criterion for this Query.
+ """A readonly attribute which returns the current WHERE criterion for
+ this Query.
This returned value is a SQL expression construct, or ``None`` if no
criterion has been established.
:meth:`.Query.with_polymorphic` applies transformations
to the "main" mapped class represented by this :class:`.Query`.
The "main" mapped class here means the :class:`.Query`
- object's first argument is a full class, i.e. ``session.query(SomeClass)``.
- These transformations allow additional tables to be present
- in the FROM clause so that columns for a joined-inheritance
- subclass are available in the query, both for the purposes
- of load-time efficiency as well as the ability to use
+ object's first argument is a full class, i.e.
+ ``session.query(SomeClass)``. These transformations allow additional
+ tables to be present in the FROM clause so that columns for a
+ joined-inheritance subclass are available in the query, both for the
+ purposes of load-time efficiency as well as the ability to use
these columns at query time.
See the documentation section :ref:`with_polymorphic` for
not mapper.always_refresh and \
self._lockmode is None:
- instance = loading.get_from_identity(self.session, key, attributes.PASSIVE_OFF)
+ instance = loading.get_from_identity(
+ self.session, key, attributes.PASSIVE_OFF)
if instance is not None:
# reject calls for id in identity map but class
# mismatch.
@_generative()
def with_entities(self, *entities):
- """Return a new :class:`.Query` replacing the SELECT list with the given
- entities.
+ """Return a new :class:`.Query` replacing the SELECT list with the
+ given entities.
e.g.::
"""
self._set_entities(entities)
-
@_generative()
def add_columns(self, *column):
"""Add one or more column expressions to the list
":meth:`.add_column` is superseded by :meth:`.add_columns`",
False)
def add_column(self, column):
- """Add a column expression to the list of result columns to be returned.
+ """Add a column expression to the list of result columns to be
+ returned.
Pending deprecation: :meth:`.add_column` will be superseded by
:meth:`.add_columns`.
"""
-
return self.add_columns(column)
def options(self, *args):
"""
-
return self._from_selectable(
- expression.union(*([self]+ list(q))))
+ expression.union(*([self] + list(q))))
def union_all(self, *q):
"""Produce a UNION ALL of this Query against one or more queries.
"""
return self._from_selectable(
- expression.union_all(*([self]+ list(q)))
+ expression.union_all(*([self] + list(q)))
)
def intersect(self, *q):
"""
return self._from_selectable(
- expression.intersect(*([self]+ list(q)))
+ expression.intersect(*([self] + list(q)))
)
def intersect_all(self, *q):
"""
return self._from_selectable(
- expression.intersect_all(*([self]+ list(q)))
+ expression.intersect_all(*([self] + list(q)))
)
def except_(self, *q):
"""
return self._from_selectable(
- expression.except_(*([self]+ list(q)))
+ expression.except_(*([self] + list(q)))
)
def except_all(self, *q):
"""
return self._from_selectable(
- expression.except_all(*([self]+ list(q)))
+ expression.except_all(*([self] + list(q)))
)
def join(self, *props, **kwargs):
In the above example we refer to ``User.addresses`` as passed to
:meth:`~.Query.join` as the *on clause*, that is, it indicates
how the "ON" portion of the JOIN should be constructed. For a
- single-entity query such as the one above (i.e. we start by selecting only from
- ``User`` and nothing else), the relationship can also be specified by its
- string name::
+ single-entity query such as the one above (i.e. we start by selecting
+ only from ``User`` and nothing else), the relationship can also be
+ specified by its string name::
q = session.query(User).join("addresses")
q = session.query(User).join("orders", "items", "keywords")
- The above would be shorthand for three separate calls to :meth:`~.Query.join`,
- each using an explicit attribute to indicate the source entity::
+ The above would be shorthand for three separate calls to
+ :meth:`~.Query.join`, each using an explicit attribute to indicate
+ the source entity::
q = session.query(User).\\
join(User.orders).\\
There is a lot of flexibility in what the "target" can be when using
:meth:`~.Query.join`. As noted previously, it also accepts
- :class:`.Table` constructs and other selectables such as :func:`.alias`
- and :func:`.select` constructs, with either the one or two-argument forms::
+ :class:`.Table` constructs and other selectables such as
+ :func:`.alias` and :func:`.select` constructs, with either the one
+ or two-argument forms::
addresses_q = select([Address.user_id]).\\
- where(Address.email_address.endswith("@bar.com")).\\
- alias()
+ where(Address.email_address.endswith("@bar.com")).\\
+ alias()
q = session.query(User).\\
join(addresses_q, addresses_q.c.user_id==User.id)
:meth:`~.Query.join` also features the ability to *adapt* a
- :meth:`~sqlalchemy.orm.relationship` -driven ON clause to the target selectable.
- Below we construct a JOIN from ``User`` to a subquery against ``Address``, allowing
- the relationship denoted by ``User.addresses`` to *adapt* itself
- to the altered target::
+ :meth:`~sqlalchemy.orm.relationship` -driven ON clause to the target
+ selectable. Below we construct a JOIN from ``User`` to a subquery
+ against ``Address``, allowing the relationship denoted by
+ ``User.addresses`` to *adapt* itself to the altered target::
address_subq = session.query(Address).\\
- filter(Address.email_address == 'ed@foo.com').\\
- subquery()
+ filter(Address.email_address == 'ed@foo.com').\\
+ subquery()
q = session.query(User).join(address_subq, User.addresses)
if not create_aliases and prop:
self._update_joinpoint({
'_joinpoint_entity': right,
- 'prev':((left, right, prop.key), self._joinpoint)
+ 'prev': ((left, right, prop.key), self._joinpoint)
})
else:
self._joinpoint = {
if item == -1:
return list(self)[-1]
else:
- return list(self[item:item+1])[0]
+ return list(self[item:item + 1])[0]
@_generative(_no_statement_condition)
def slice(self, start, stop):
def _execute_and_instances(self, querycontext):
conn = self._connection_from_session(
- mapper = self._mapper_zero_or_none(),
- clause = querycontext.statement,
+ mapper=self._mapper_zero_or_none(),
+ clause=querycontext.statement,
close_with_result=True)
result = conn.execute(querycontext.statement, self._params)
return loading.instances(self, cursor, context)
-
def merge_result(self, iterator, load=True):
"""Merge a result into this :class:`.Query` object's Session.
- Given an iterator returned by a :class:`.Query` of the same structure as this
- one, return an identical iterator of results, with all mapped
- instances merged into the session using :meth:`.Session.merge`. This is an
- optimized method which will merge all mapped instances, preserving the
- structure of the result rows and unmapped columns with less method
- overhead than that of calling :meth:`.Session.merge` explicitly for each
- value.
+ Given an iterator returned by a :class:`.Query` of the same structure
+ as this one, return an identical iterator of results, with all mapped
+ instances merged into the session using :meth:`.Session.merge`. This
+ is an optimized method which will merge all mapped instances,
+ preserving the structure of the result rows and unmapped columns with
+ less method overhead than that of calling :meth:`.Session.merge`
+ explicitly for each value.
The structure of the results is determined based on the column list of
- this :class:`.Query` - if these do not correspond, unchecked errors will occur.
+ this :class:`.Query` - if these do not correspond, unchecked errors
+ will occur.
The 'load' argument is the same as that of :meth:`.Session.merge`.
@property
def _select_args(self):
return {
- 'limit':self._limit,
- 'offset':self._offset,
- 'distinct':self._distinct,
- 'prefixes':self._prefixes,
- 'group_by':self._group_by or None,
- 'having':self._having
+ 'limit': self._limit,
+ 'offset': self._offset,
+ 'distinct': self._distinct,
+ 'prefixes': self._prefixes,
+ 'group_by': self._group_by or None,
+ 'having': self._having
}
@property
removed from the session. Matched objects are removed from the
session.
- ``'evaluate'`` - Evaluate the query's criteria in Python straight on
- the objects in the session. If evaluation of the criteria isn't
+ ``'evaluate'`` - Evaluate the query's criteria in Python straight
+ on the objects in the session. If evaluation of the criteria isn't
implemented, an error is raised. In that case you probably
want to use the 'fetch' strategy as a fallback.
objects that are matched by the update query. The updated
attributes are expired on matched objects.
- ``'evaluate'`` - Evaluate the Query's criteria in Python straight on
- the objects in the session. If evaluation of the criteria isn't
+ ``'evaluate'`` - Evaluate the Query's criteria in Python straight
+ on the objects in the session. If evaluation of the criteria isn't
implemented, an exception is raised.
The expression evaluator currently doesn't account for differing
update_op.exec_()
return update_op.rowcount
-
_lockmode_lookup = {
'read': 'read',
'read_nowait': 'read_nowait',
inspection._self_inspects(Query)
+
class _QueryEntity(object):
"""represent an entity column returned within a Query result."""
q.__dict__ = self.__dict__.copy()
return q
+
class _MapperEntity(_QueryEntity):
"""mapper/class/AliasedClass entity"""
def setup_entity(self, ext_info, aliased_adapter):
self.mapper = ext_info.mapper
self.aliased_adapter = aliased_adapter
- self.selectable = ext_info.selectable
+ self.selectable = ext_info.selectable
self.is_aliased_class = ext_info.is_aliased_class
self._with_polymorphic = ext_info.with_polymorphic_mappers
self._polymorphic_discriminator = \
# require row aliasing unconditionally.
if not adapter and self.mapper._requires_row_aliasing:
adapter = sql_util.ColumnAdapter(
- self.selectable,
- self.mapper._equivalent_columns)
+ self.selectable,
+ self.mapper._equivalent_columns)
if self.primary_entity:
_instance = loading.instance_processor(
- self.mapper,
- context,
- self.path,
- adapter,
- only_load_props=query._only_load_props,
- refresh_state=context.refresh_state,
- polymorphic_discriminator=
- self._polymorphic_discriminator
+ self.mapper,
+ context,
+ self.path,
+ adapter,
+ only_load_props=query._only_load_props,
+ refresh_state=context.refresh_state,
+ polymorphic_discriminator=self._polymorphic_discriminator
)
else:
_instance = loading.instance_processor(
- self.mapper,
- context,
- self.path,
- adapter,
- polymorphic_discriminator=
- self._polymorphic_discriminator)
+ self.mapper,
+ context,
+ self.path,
+ adapter,
+ polymorphic_discriminator=self._polymorphic_discriminator
+ )
return _instance, self._label_name
def __str__(self):
return str(self.mapper)
+
class _ColumnEntity(_QueryEntity):
"""Column/expression based entity."""
if c is not column:
return
-
if not isinstance(column, sql.ColumnElement):
raise sa_exc.InvalidRequestError(
"SQL expression, column, or mapped entity "
else:
self.entity_zero = None
-
@property
def entity_zero_or_selectable(self):
if self.entity_zero is not None:
def type(self):
return self.column.type
-
def adapt_to_selectable(self, query, sel):
c = _ColumnEntity(query, sel.corresponding_column(self.column))
c._label_name = self._label_name
def __str__(self):
return str(self.column)
+
log.class_logger(Query)
+
class QueryContext(object):
multi_row_eager_loaders = False
adapter = None
else:
alias = self.alias
query._from_obj_alias = sql_util.ColumnAdapter(alias)
-
-
from ..sql import operators, expression, visitors
from .interfaces import MANYTOMANY, MANYTOONE, ONETOMANY
+
def remote(expr):
"""Annotate a portion of a primaryjoin expression
with a 'remote' annotation.
return _annotate_columns(expression._clause_element_as_expr(expr),
{"remote": True})
+
def foreign(expr):
"""Annotate a portion of a primaryjoin expression
with a 'foreign' annotation.
element = clone(element)
return element
+
class JoinCondition(object):
def __init__(self,
parent_selectable,
# general mapped table, which in the case of inheritance is
# a join.
try:
+ consider_as_foreign_keys = self.consider_as_foreign_keys or None
if self.secondary is not None:
if self.secondaryjoin is None:
self.secondaryjoin = \
join_condition(
- self.child_selectable,
- self.secondary,
- a_subset=self.child_local_selectable,
- consider_as_foreign_keys=\
- self.consider_as_foreign_keys or None
- )
+ self.child_selectable,
+ self.secondary,
+ a_subset=self.child_local_selectable,
+ consider_as_foreign_keys=consider_as_foreign_keys
+ )
if self.primaryjoin is None:
self.primaryjoin = \
join_condition(
- self.parent_selectable,
- self.secondary,
- a_subset=self.parent_local_selectable,
- consider_as_foreign_keys=\
- self.consider_as_foreign_keys or None
- )
+ self.parent_selectable,
+ self.secondary,
+ a_subset=self.parent_local_selectable,
+ consider_as_foreign_keys=consider_as_foreign_keys
+ )
else:
if self.primaryjoin is None:
self.primaryjoin = \
join_condition(
- self.parent_selectable,
- self.child_selectable,
- a_subset=self.parent_local_selectable,
- consider_as_foreign_keys=\
- self.consider_as_foreign_keys or None
- )
+ self.parent_selectable,
+ self.child_selectable,
+ a_subset=self.parent_local_selectable,
+ consider_as_foreign_keys=consider_as_foreign_keys
+ )
except sa_exc.NoForeignKeysError:
if self.secondary is not None:
raise sa_exc.NoForeignKeysError("Could not determine join "
def _annotate_from_fk_list(self):
def check_fk(col):
if col in self.consider_as_foreign_keys:
- return col._annotate({"foreign":True})
+ return col._annotate({"foreign": True})
self.primaryjoin = visitors.replacement_traverse(
self.primaryjoin,
{},
secondarycols = util.column_set(self.secondary.c)
else:
secondarycols = set()
+
def is_foreign(a, b):
if isinstance(a, schema.Column) and \
isinstance(b, schema.Column):
if col is not None:
if col.compare(binary.left):
binary.left = binary.left._annotate(
- {"foreign":True})
+ {"foreign": True})
elif col.compare(binary.right):
binary.right = binary.right._annotate(
- {"foreign":True})
+ {"foreign": True})
self.primaryjoin = visitors.cloned_traverse(
self.primaryjoin,
{},
- {"binary":visit_binary}
+ {"binary": visit_binary}
)
if self.secondaryjoin is not None:
self.secondaryjoin = visitors.cloned_traverse(
self.secondaryjoin,
{},
- {"binary":visit_binary}
+ {"binary": visit_binary}
)
def _refers_to_parent_table(self):
pt = self.parent_selectable
mt = self.child_selectable
result = [False]
+
def visit_binary(binary):
c, f = binary.left, binary.right
if (
visitors.traverse(
self.primaryjoin,
{},
- {"binary":visit_binary}
+ {"binary": visit_binary}
)
return result[0]
elif self._local_remote_pairs or self._remote_side:
self._annotate_remote_from_args()
elif self._refers_to_parent_table():
- self._annotate_selfref(lambda col:"foreign" in col._annotations)
+ self._annotate_selfref(lambda col: "foreign" in col._annotations)
elif self._tables_overlap():
self._annotate_remote_with_overlap()
else:
"""
def repl(element):
if self.secondary.c.contains_column(element):
- return element._annotate({"remote":True})
+ return element._annotate({"remote": True})
self.primaryjoin = visitors.replacement_traverse(
self.primaryjoin, {}, repl)
self.secondaryjoin = visitors.replacement_traverse(
isinstance(binary.right, expression.ColumnClause):
# assume one to many - FKs are "remote"
if fn(binary.left):
- binary.left = binary.left._annotate({"remote":True})
+ binary.left = binary.left._annotate({"remote": True})
if fn(binary.right) and \
not equated:
binary.right = binary.right._annotate(
- {"remote":True})
+ {"remote": True})
else:
self._warn_non_column_elements()
self.primaryjoin = visitors.cloned_traverse(
self.primaryjoin, {},
- {"binary":visit_binary})
+ {"binary": visit_binary})
def _annotate_remote_from_args(self):
"""annotate 'remote' in primaryjoin, secondaryjoin
remote_side = self._remote_side
if self._refers_to_parent_table():
- self._annotate_selfref(lambda col:col in remote_side)
+ self._annotate_selfref(lambda col: col in remote_side)
else:
def repl(element):
if element in remote_side:
- return element._annotate({"remote":True})
+ return element._annotate({"remote": True})
self.primaryjoin = visitors.replacement_traverse(
self.primaryjoin, {}, repl)
binary.right)
binary.right, binary.left = proc_left_right(binary.right,
binary.left)
+
def proc_left_right(left, right):
if isinstance(left, expression.ColumnClause) and \
isinstance(right, expression.ColumnClause):
if self.child_selectable.c.contains_column(right) and \
self.parent_selectable.c.contains_column(left):
- right = right._annotate({"remote":True})
+ right = right._annotate({"remote": True})
else:
- self._warn_non_column_elements()
+ self._warn_non_column_elements()
return left, right
self.primaryjoin = visitors.cloned_traverse(
self.primaryjoin, {},
- {"binary":visit_binary})
+ {"binary": visit_binary})
def _annotate_remote_distinct_selectables(self):
"""annotate 'remote' in primaryjoin, secondaryjoin
or self.child_local_selectable.c.\
contains_column(element)
):
- return element._annotate({"remote":True})
+ return element._annotate({"remote": True})
self.primaryjoin = visitors.replacement_traverse(
self.primaryjoin, {}, repl)
def locals_(elem):
if "remote" not in elem._annotations and \
elem in local_side:
- return elem._annotate({"local":True})
+ return elem._annotate({"local": True})
self.primaryjoin = visitors.replacement_traverse(
self.primaryjoin, {}, locals_
)
self.local_remote_pairs = self._deannotate_pairs(lrp)
self.synchronize_pairs = self._deannotate_pairs(sync_pairs)
- self.secondary_synchronize_pairs = self._deannotate_pairs(secondary_sync_pairs)
+ self.secondary_synchronize_pairs = \
+ self._deannotate_pairs(secondary_sync_pairs)
@util.memoized_property
def remote_columns(self):
if annotation.issubset(col._annotations)
])
-
def join_targets(self, source_selectable,
dest_selectable,
aliased,
# regardless of context.
dest_selectable = _shallow_annotate(
dest_selectable,
- {'no_replacement_traverse':True})
+ {'no_replacement_traverse': True})
primaryjoin, secondaryjoin, secondary = self.primaryjoin, \
self.secondaryjoin, self.secondary
bind_to_col = dict((binds[col].key, col) for col in binds)
return lazywhere, bind_to_col, equated_columns
-
-
-
self.registry.clear()
def configure(self, **kwargs):
- """reconfigure the :class:`.sessionmaker` used by this :class:`.scoped_session`.
+ """reconfigure the :class:`.sessionmaker` used by this
+ :class:`.scoped_session`.
See :meth:`.sessionmaker.configure`.
ScopedSession = scoped_session
"""Old name for backwards compatibility."""
+
def instrument(name):
def do(self, *args, **kwargs):
return getattr(self.registry(), name)(*args, **kwargs)
return do
+
for meth in Session.public_methods:
setattr(scoped_session, meth, instrument(meth))
+
def makeprop(name):
def set(self, attr):
setattr(self.registry(), name, attr)
+
def get(self):
return getattr(self.registry(), name)
+
return property(get, set)
+
for prop in ('bind', 'dirty', 'deleted', 'new', 'identity_map',
- 'is_active', 'autoflush', 'no_autoflush'):
+ 'is_active', 'autoflush', 'no_autoflush'):
setattr(scoped_session, prop, makeprop(prop))
+
def clslevel(name):
def do(cls, *args, **kwargs):
return getattr(Session, name)(*args, **kwargs)
return classmethod(do)
+
for prop in ('close_all', 'object_session', 'identity_key'):
setattr(scoped_session, prop, clslevel(prop))
-