import operator
from operator import itemgetter
-from sqlalchemy import util, event, exc as sa_exc
+from sqlalchemy import util, event, exc as sa_exc, inspection
from sqlalchemy.orm import interfaces, collections, events, exc as orm_exc
""")
-
class QueryableAttribute(interfaces.PropComparator):
"""Base class for class-bound attributes. """
return self.comparator.property
+@inspection._inspects(QueryableAttribute)
+def _get_prop(source):
+ return source.property
+
class InstrumentedAttribute(QueryableAttribute):
"""Class bound instrumented attribute which adds descriptor methods."""
"""
- def __init__(self, class_, key, descriptor, comparator,
+ def __init__(self, class_, key, descriptor, property_,
+ comparator,
adapter=None, doc=None):
self.class_ = class_
self.key = key
self.descriptor = descriptor
+ self.original_property = property_
self._comparator = comparator
self.adapter = adapter
self.__doc__ = doc
if self.dispatch.remove:
self.fire_remove_event(state, dict_, old, None)
- state.modified_event(dict_, self, old)
+ state._modified_event(dict_, self, old)
del dict_[self.key]
def get_history(self, state, dict_, passive=PASSIVE_OFF):
if self.dispatch.set:
value = self.fire_replace_event(state, dict_,
value, old, initiator)
- state.modified_event(dict_, self, old)
+ state._modified_event(dict_, self, old)
dict_[self.key] = value
def fire_replace_event(self, state, dict_, value, previous, initiator):
for fn in self.dispatch.remove:
fn(state, value, initiator or self)
- state.modified_event(dict_, self, value)
+ state._modified_event(dict_, self, value)
def fire_replace_event(self, state, dict_, value, previous, initiator):
if self.trackparent:
for fn in self.dispatch.set:
value = fn(state, value, previous, initiator or self)
- state.modified_event(dict_, self, previous)
+ state._modified_event(dict_, self, previous)
if self.trackparent:
if value is not None:
for fn in self.dispatch.append:
value = fn(state, value, initiator or self)
- state.modified_event(dict_, self, NEVER_SET, True)
+ state._modified_event(dict_, self, NEVER_SET, True)
if self.trackparent and value is not None:
self.sethasparent(instance_state(value), state, True)
return value
def fire_pre_remove_event(self, state, dict_, initiator):
- state.modified_event(dict_, self, NEVER_SET, True)
+ state._modified_event(dict_, self, NEVER_SET, True)
def fire_remove_event(self, state, dict_, value, initiator):
if self.trackparent and value is not None:
for fn in self.dispatch.remove:
fn(state, value, initiator or self)
- state.modified_event(dict_, self, NEVER_SET, True)
+ state._modified_event(dict_, self, NEVER_SET, True)
def delete(self, state, dict_):
if self.key not in dict_:
return
- state.modified_event(dict_, self, NEVER_SET, True)
+ state._modified_event(dict_, self, NEVER_SET, True)
collection = self.get_collection(state, state.dict)
collection.clear_with_event()
value = self.fire_append_event(state, dict_, value, initiator)
assert self.key not in dict_, \
"Collection was loaded during event handling."
- state.get_pending(self.key).append(value)
+ state._get_pending_mutation(self.key).append(value)
else:
collection.append_with_event(value, initiator)
self.fire_remove_event(state, dict_, value, initiator)
assert self.key not in dict_, \
"Collection was loaded during event handling."
- state.get_pending(self.key).remove(value)
+ state._get_pending_mutation(self.key).remove(value)
else:
collection.remove_with_event(value, initiator)
return
# place a copy of "old" in state.committed_state
- state.modified_event(dict_, self, old, True)
+ state._modified_event(dict_, self, old, True)
old_collection = getattr(old, '_sa_adapter')
state.commit(dict_, [self.key])
- if self.key in state.pending:
+ if self.key in state._pending_mutations:
# pending items exist. issue a modified event,
# add/remove new items.
- state.modified_event(dict_, self, user_data, True)
+ state._modified_event(dict_, self, user_data, True)
- pending = state.pending.pop(self.key)
+ pending = state._pending_mutations.pop(self.key)
added = pending.added_items
removed = pending.deleted_items
for item in added:
"""
state, dict_ = instance_state(instance), instance_dict(instance)
impl = state.manager[key].impl
- state.modified_event(dict_, impl, NO_VALUE)
+ state._modified_event(dict_, impl, NO_VALUE)
self.parent.class_,
self.key,
self.descriptor,
+ self,
lambda: self._comparator_factory(mapper),
doc=self.doc
)
if self.key not in state.committed_state:
state.committed_state[self.key] = CollectionHistory(self, state)
- state.modified_event(dict_,
+ state._modified_event(dict_,
self,
attributes.NEVER_SET)
yield c
@util.memoized_property
- def properties(self):
+ def attr(self):
if _new_mappers:
configure_mappers()
return util.ImmutableProperties(self._props)
def _filter_properties(self, type_):
if _new_mappers:
configure_mappers()
- return dict(
+ return util.ImmutableProperties(dict(
(k, v) for k, v in self._props.iteritems()
if isinstance(v, type_)
- )
+ ))
@_memoized_configured_property
def _get_clause(self):
class ColumnProperty(StrategizedProperty):
"""Describes an object attribute that corresponds to a table column.
-
+
Public constructor is the :func:`.orm.column_property` function.
-
+
"""
def __init__(self, *columns, **kwargs):
else:
self.strategy_class = strategies.ColumnLoader
+ @property
+ def expression(self):
+ """Return the primary column or expression for this ColumnProperty.
+
+ """
+ return self.columns[0]
+
def instrument_class(self, mapper):
if not self.instrument:
return
class RelationshipProperty(StrategizedProperty):
"""Describes an object property that holds a single item or list
of items that correspond to a related database table.
-
+
Public constructor is the :func:`.orm.relationship` function.
-
+
Of note here is the :class:`.RelationshipProperty.Comparator`
class, which implements comparison operations for scalar-
and collection-referencing mapped attributes.
-
+
"""
strategy_wildcard_key = 'relationship:*'
else:
self.backref = backref
-
def instrument_class(self, mapper):
attributes.register_descriptor(
mapper.class_,
def __init__(self, prop, mapper, of_type=None, adapter=None):
"""Construction of :class:`.RelationshipProperty.Comparator`
is internal to the ORM's attribute mechanics.
-
+
"""
self.prop = prop
self.mapper = mapper
def of_type(self, cls):
"""Produce a construct that represents a particular 'subtype' of
attribute for the parent class.
-
+
Currently this is usable in conjunction with :meth:`.Query.join`
and :meth:`.Query.outerjoin`.
-
+
"""
return RelationshipProperty.Comparator(
self.property,
def in_(self, other):
"""Produce an IN clause - this is not implemented
for :func:`~.orm.relationship`-based attributes at this time.
-
+
"""
raise NotImplementedError('in_() not yet supported for '
'relationships. For a simple many-to-one, use '
this will typically produce a
clause such as::
-
+
mytable.related_id == <some id>
-
+
Where ``<some id>`` is the primary key of the given
object.
-
+
The ``==`` operator provides partial functionality for non-
many-to-one comparisons:
-
+
* Comparisons against collections are not supported.
Use :meth:`~.RelationshipProperty.Comparator.contains`.
* Compared to a scalar one-to-many, will produce a
def any(self, criterion=None, **kwargs):
"""Produce an expression that tests a collection against
particular criterion, using EXISTS.
-
+
An expression like::
-
+
session.query(MyClass).filter(
MyClass.somereference.any(SomeRelated.x==2)
)
-
-
+
+
Will produce a query like::
-
+
SELECT * FROM my_table WHERE
EXISTS (SELECT 1 FROM related WHERE related.my_id=my_table.id
AND related.x=2)
-
+
Because :meth:`~.RelationshipProperty.Comparator.any` uses
a correlated subquery, its performance is not nearly as
good when compared against large target tables as that of
using a join.
-
+
:meth:`~.RelationshipProperty.Comparator.any` is particularly
useful for testing for empty collections::
-
+
session.query(MyClass).filter(
~MyClass.somereference.any()
)
-
+
will produce::
-
+
SELECT * FROM my_table WHERE
NOT EXISTS (SELECT 1 FROM related WHERE related.my_id=my_table.id)
-
+
:meth:`~.RelationshipProperty.Comparator.any` is only
valid for collections, i.e. a :func:`.relationship`
that has ``uselist=True``. For scalar references,
use :meth:`~.RelationshipProperty.Comparator.has`.
-
+
"""
if not self.property.uselist:
raise sa_exc.InvalidRequestError(
particular criterion, using EXISTS.
An expression like::
-
+
session.query(MyClass).filter(
MyClass.somereference.has(SomeRelated.x==2)
)
-
-
+
+
Will produce a query like::
-
+
SELECT * FROM my_table WHERE
EXISTS (SELECT 1 FROM related WHERE related.id==my_table.related_id
AND related.x=2)
a correlated subquery, its performance is not nearly as
good when compared against large target tables as that of
using a join.
-
+
:meth:`~.RelationshipProperty.Comparator.has` is only
valid for scalar references, i.e. a :func:`.relationship`
that has ``uselist=False``. For collection references,
use :meth:`~.RelationshipProperty.Comparator.any`.
-
+
"""
if self.property.uselist:
raise sa_exc.InvalidRequestError(
def contains(self, other, **kwargs):
"""Return a simple expression that tests a collection for
containment of a particular item.
-
+
:meth:`~.RelationshipProperty.Comparator.contains` is
only valid for a collection, i.e. a
:func:`~.orm.relationship` that implements
one-to-many or many-to-many with ``uselist=True``.
-
+
When used in a simple one-to-many context, an
expression like::
-
+
MyClass.contains(other)
-
+
Produces a clause like::
-
+
mytable.id == <some id>
-
+
Where ``<some id>`` is the value of the foreign key
attribute on ``other`` which refers to the primary
key of its parent object. From this it follows that
:meth:`~.RelationshipProperty.Comparator.contains` is
very useful when used with simple one-to-many
operations.
-
+
For many-to-many operations, the behavior of
:meth:`~.RelationshipProperty.Comparator.contains`
has more caveats. The association table will be
rendered in the statement, producing an "implicit"
join, that is, includes multiple tables in the FROM
clause which are equated in the WHERE clause::
-
+
query(MyClass).filter(MyClass.contains(other))
-
+
Produces a query like::
-
+
SELECT * FROM my_table, my_association_table AS
my_association_table_1 WHERE
my_table.id = my_association_table_1.parent_id
AND my_association_table_1.child_id = <some id>
-
+
Where ``<some id>`` would be the primary key of
``other``. From the above, it is clear that
:meth:`~.RelationshipProperty.Comparator.contains`
a less-performant alternative using EXISTS, or refer
to :meth:`.Query.outerjoin` as well as :ref:`ormtutorial_joins`
for more details on constructing outer joins.
-
+
"""
if not self.property.uselist:
raise sa_exc.InvalidRequestError(
"""Implement the ``!=`` operator.
In a many-to-one context, such as::
-
+
MyClass.some_prop != <some object>
-
+
This will typically produce a clause such as::
-
+
mytable.related_id != <some id>
-
+
Where ``<some id>`` is the primary key of the
given object.
-
+
The ``!=`` operator provides partial functionality for non-
many-to-one comparisons:
-
+
* Comparisons against collections are not supported.
Use
:meth:`~.RelationshipProperty.Comparator.contains`
membership tests.
* Comparisons against ``None`` given in a one-to-many
or many-to-many context produce an EXISTS clause.
-
+
"""
if isinstance(other, (NoneType, expression._Null)):
if self.property.direction == MANYTOONE:
dest_state.get_impl(self.key).set(dest_state,
dest_dict, obj, None)
+ def _value_as_iterable(self, state, dict_, key,
+ passive=attributes.PASSIVE_OFF):
+ """Return a list of tuples (state, obj) for the given
+ key.
+
+ returns an empty list if the value is None/empty/PASSIVE_NO_RESULT
+ """
+
+ impl = state.manager[key].impl
+ x = impl.get(state, dict_, passive=passive)
+ if x is attributes.PASSIVE_NO_RESULT or x is None:
+ return []
+ elif hasattr(impl, 'get_collection'):
+ return [
+ (attributes.instance_state(o), o) for o in
+ impl.get_collection(state, dict_, x, passive=passive)
+ ]
+ else:
+ return [(attributes.instance_state(x), x)]
+
+
def cascade_iterator(self, type_, state, dict_, visited_states, halt_on=None):
#assert type_ in self.cascade
get_all_pending(state, dict_)
else:
- tuples = state.value_as_iterable(dict_, self.key,
+ tuples = self._value_as_iterable(state, dict_, self.key,
passive=passive)
skip_pending = type_ == 'refresh-expire' and 'delete-orphan' \
def mapper(self):
"""Return the targeted :class:`.Mapper` for this
:class:`.RelationshipProperty`.
-
+
This is a lazy-initializing static attribute.
-
+
"""
if isinstance(self.argument, type):
mapper_ = mapper.class_mapper(self.argument,
def _process_dependent_arguments(self):
"""Convert incoming configuration arguments to their
proper form.
-
+
Callables are resolved, ORM annotations removed.
-
+
"""
# accept callables for other attributes which may require
# deferred initialization. This technique is used
def _determine_joins(self):
"""Determine the 'primaryjoin' and 'secondaryjoin' attributes,
if not passed to the constructor already.
-
+
This is based on analysis of the foreign key relationships
between the parent and target mapped selectables.
-
+
"""
if self.secondaryjoin is not None and self.secondary is None:
raise sa_exc.ArgumentError("Property '" + self.key
def _columns_are_mapped(self, *cols):
"""Return True if all columns in the given collection are
mapped by the tables referenced by this :class:`.Relationship`.
-
+
"""
for c in cols:
if self.secondary is not None \
"""Determine a list of "source"/"destination" column pairs
based on the given join condition, as well as the
foreign keys argument.
-
+
"source" would be a column referenced by a foreign key,
and "destination" would be the column who has a foreign key
reference to "source".
-
+
"""
fks = self._user_defined_foreign_keys
def _determine_synchronize_pairs(self):
"""Resolve 'primary'/foreign' column pairs from the primaryjoin
and secondaryjoin arguments.
-
+
"""
if self.local_remote_pairs:
if not self._user_defined_foreign_keys:
def _determine_direction(self):
"""Determine if this relationship is one to many, many to one,
many to many.
-
+
This is derived from the primaryjoin, presence of "secondary",
and in the case of self-referential the "remote side".
-
+
"""
if self.secondaryjoin is not None:
self.direction = MANYTOMANY
"""Determine pairs of columns representing "local" to
"remote", where "local" columns are on the parent mapper,
"remote" are on the target mapper.
-
+
These pairs are used on the load side only to generate
lazy loading clauses.
"""
for state in self.identity_map.all_states() + list(self._new):
- state.detach()
+ state._detach()
self.identity_map = self._identity_cls()
self._new = {}
state.expire(state.dict, self.identity_map._modified)
elif state in self._new:
self._new.pop(state)
- state.detach()
+ state._detach()
@util.deprecated("0.7", "The non-weak-referencing identity map "
"feature is no longer needed.")
def _expunge_state(self, state):
if state in self._new:
self._new.pop(state)
- state.detach()
+ state._detach()
elif self.identity_map.contains_state(state):
self.identity_map.discard(state)
self._deleted.pop(state, None)
- state.detach()
+ state._detach()
elif self.transaction:
self.transaction._deleted.pop(state, None)
from sqlalchemy.orm import exc as orm_exc, attributes, interfaces,\
util as orm_util
from sqlalchemy.orm.attributes import PASSIVE_OFF, PASSIVE_NO_RESULT, \
- PASSIVE_NO_FETCH, NEVER_SET, ATTR_WAS_SET, NO_VALUE
+ PASSIVE_NO_FETCH, NEVER_SET, ATTR_WAS_SET, NO_VALUE,\
+ PASSIVE_NO_INITIALIZE
mapperlib = util.importlater("sqlalchemy.orm", "mapperlib")
+sessionlib = util.importlater("sqlalchemy.orm", "session")
import sys
self.callables = {}
self.committed_state = {}
+ @util.memoized_property
+ def attr(self):
+ return util.ImmutableProperties(
+ dict(
+ (key, InspectAttr(self, key))
+ for key in self.manager
+ )
+ )
+
+ @property
+ def transient(self):
+ return self.key is None and \
+ not self._attached
+
+ @property
+ def pending(self):
+ return self.key is None and \
+ self._attached
+
+ @property
+ def persistent(self):
+ return self.key is not None and \
+ self._attached
+
+ @property
+ def detached(self):
+ return self.key is not None and \
+ not self._attached
+
+ @property
+ def _attached(self):
+ return self.session_id is not None and \
+ self.session_id in sessionlib._sessions
+
+ @property
+ def session(self):
+ return sessionlib._state_session(self)
+
+ @property
+ def object(self):
+ return self.obj()
+
+ @property
+ def identity(self):
+ if self.key is None:
+ return None
+ else:
+ return self.key[1]
+
+ @property
+ def identity_key(self):
+ # TODO: just change .key to .identity_key across
+ # the board ? probably
+ return self.key
+
@util.memoized_property
def parents(self):
return {}
@util.memoized_property
- def pending(self):
+ def _pending_mutations(self):
return {}
@util.memoized_property
def has_identity(self):
return bool(self.key)
- def detach(self):
+ def _detach(self):
self.session_id = None
- def dispose(self):
- self.detach()
+ def _dispose(self):
+ self._detach()
del self.obj
def _cleanup(self, ref):
def get_impl(self, key):
return self.manager[key].impl
- def get_pending(self, key):
- if key not in self.pending:
- self.pending[key] = PendingCollection()
- return self.pending[key]
-
- def value_as_iterable(self, dict_, key, passive=PASSIVE_OFF):
- """Return a list of tuples (state, obj) for the given
- key.
-
- returns an empty list if the value is None/empty/PASSIVE_NO_RESULT
- """
-
- impl = self.manager[key].impl
- x = impl.get(self, dict_, passive=passive)
- if x is PASSIVE_NO_RESULT or x is None:
- return []
- elif hasattr(impl, 'get_collection'):
- return [
- (attributes.instance_state(o), o) for o in
- impl.get_collection(self, dict_, x, passive=passive)
- ]
- else:
- return [(attributes.instance_state(x), x)]
+ def _get_pending_mutation(self, key):
+ if key not in self._pending_mutations:
+ self._pending_mutations[key] = PendingCollection()
+ return self._pending_mutations[key]
def __getstate__(self):
d = {'instance':self.obj()}
d.update(
(k, self.__dict__[k]) for k in (
- 'committed_state', 'pending', 'modified', 'expired',
+ 'committed_state', '_pending_mutations', 'modified', 'expired',
'callables', 'key', 'parents', 'load_options', 'mutable_dict',
'class_',
) if k in self.__dict__
mapperlib.configure_mappers()
self.committed_state = state.get('committed_state', {})
- self.pending = state.get('pending', {})
+ self._pending_mutations = state.get('_pending_mutations', {})
self.parents = state.get('parents', {})
self.modified = state.get('modified', False)
self.expired = state.get('expired', False)
self.committed_state.clear()
- self.__dict__.pop('pending', None)
+ self.__dict__.pop('_pending_mutations', None)
self.__dict__.pop('mutable_dict', None)
# clear out 'parents' collection. not
self.manager.dispatch.expire(self, None)
def expire_attributes(self, dict_, attribute_names):
- pending = self.__dict__.get('pending', None)
+ pending = self.__dict__.get('_pending_mutations', None)
mutable_dict = self.mutable_dict
for key in attribute_names:
def _is_really_none(self):
return self.obj()
- def modified_event(self, dict_, attr, previous, collection=False):
+ def _modified_event(self, dict_, attr, previous, collection=False):
if attr.key not in self.committed_state:
if collection:
if previous is NEVER_SET:
"""
self.committed_state.clear()
- self.__dict__.pop('pending', None)
+ self.__dict__.pop('_pending_mutations', None)
callables = self.callables
for key in list(callables):
self.modified = self.expired = False
self._strong_obj = None
+class InspectAttr(object):
+ """Provide inspection interface to an object's state."""
+
+ def __init__(self, state, key):
+ self.state = state
+ self.key = key
+
+ @property
+ def loaded_value(self):
+ return self.state.dict.get(self.key, NO_VALUE)
+
+ @property
+ def value(self):
+ return self.state.manager[self.key].__get__(
+ self.state.obj(), self.state.class_)
+
+ @property
+ def history(self):
+ return self.state.get_history(self.key,
+ PASSIVE_NO_INITIALIZE)
+
class MutableAttrInstanceState(InstanceState):
"""InstanceState implementation for objects that reference 'mutable'
attributes.
instance_dict = self._instance_dict()
if instance_dict:
instance_dict.discard(self)
- self.dispose()
+ self._dispose()
def __resurrect(self):
"""A substitute for the obj() weakref function which resurrects."""
# list is still here.
eq_(
set(attributes.instance_state(i1).
- pending['keywords'].added_items),
+ _pending_mutations['keywords'].added_items),
set([k2])
)
# because autoflush is off, k2 is still
# the pending collection was removed
assert 'keywords' not in attributes.\
instance_state(i1).\
- pending
+ _pending_mutations
def test_duplicate_adds(self):
Item, Keyword = (self.classes.Item, self.classes.Keyword)
"""test the inspection registry system."""
-from test.lib.testing import eq_, assert_raises
+from test.lib.testing import eq_, assert_raises, is_
from sqlalchemy import exc, util
from sqlalchemy import inspect
from test.orm import _fixtures
-from sqlalchemy.orm import class_mapper, synonym
-from sqlalchemy.orm.attributes import instance_state
+from sqlalchemy.orm import class_mapper, synonym, Session
+from sqlalchemy.orm.attributes import instance_state, NO_VALUE
+from test.lib import testing
class TestORMInspection(_fixtures.FixtureTest):
@classmethod
assert inspect(u1) is instance_state(u1)
- def test_synonyms(self):
+ def test_column_collection_iterate(self):
User = self.classes.User
- syn = inspect(User).synonyms
-
- # TODO: some of the synonym debacle in 0.7
- # has led User.name_syn.property to be the
- # ColumnProperty. not sure if we want that
- # implicit jump in there though, perhaps get Query/etc. to
- # call upon "effective_property" or something like that
-
- eq_(inspect(User).synonyms, {
- "name_syn":class_mapper(User).get_property("name_syn")
- })
-
- # TODO: test all these accessors...
-
-"""
-# column collection
->>> b.columns
-[<id column>, <name column>]
-
-# its a ColumnCollection
->>> b.columns.id
-<id column>
-
-# i.e. from mapper
->>> b.primary_key
-(<id column>, )
-
-# i.e. from mapper
->>> b.local_table
-<user table>
-
-# ColumnProperty
->>> b.attr.id.columns
-[<id column>]
-
-# but perhaps we use a collection with some helpers
->>> b.attr.id.columns.first
-<id column>
-
-# and a mapper? its None since this is a column
->>> b.attr.id.mapper
-None
-
-# attr is basically the _props
->>> b.attr.keys()
-['id', 'name', 'name_syn', 'addresses']
-
-# b itself is likely just the mapper
->>> b
-<User mapper>
+ user_table = self.tables.users
+ insp = inspect(User)
+ eq_(
+ list(insp.columns),
+ [user_table.c.id, user_table.c.name]
+ )
+ is_(
+ insp.columns.id, user_table.c.id
+ )
-# get only column attributes
->>> b.column_attrs
-[<id prop>, <name prop>]
+ def test_primary_key(self):
+ User = self.classes.User
+ user_table = self.tables.users
+ insp = inspect(User)
+ eq_(insp.primary_key,
+ (user_table.c.id,)
+ )
-# its a namespace
->>> b.column_attrs.id
-<id prop>
+ def test_local_table(self):
+ User = self.classes.User
+ user_table = self.tables.users
+ insp = inspect(User)
+ is_(insp.local_table, user_table)
-# get only synonyms
->>> b.synonyms
-[<name syn prop>]
+ def test_property(self):
+ User = self.classes.User
+ user_table = self.tables.users
+ insp = inspect(User)
+ is_(insp.attr.id, class_mapper(User).get_property('id'))
-# get only relationships
->>> b.relationships
-[<addresses prop>]
+ def test_col_property(self):
+ User = self.classes.User
+ user_table = self.tables.users
+ insp = inspect(User)
+ id_prop = insp.attr.id
-# its a namespace
->>> b.relationships.addresses
-<addresses prop>
+ eq_(id_prop.columns, [user_table.c.id])
+ is_(id_prop.expression, user_table.c.id)
-# point inspect() at a class level attribute,
-# basically returns ".property"
->>> b = inspect(User.addresses)
->>> b
-<addresses prop>
+ assert not hasattr(id_prop, 'mapper')
-# mapper
->>> b.mapper
-<Address mapper>
+ def test_attr_keys(self):
+ User = self.classes.User
+ insp = inspect(User)
+ eq_(
+ set(insp.attr.keys()),
+ set(['addresses', 'orders', 'id', 'name', 'name_syn'])
+ )
-# None columns collection, just like columnprop has empty mapper
->>> b.columns
-None
+ def test_col_filter(self):
+ User = self.classes.User
+ insp = inspect(User)
+ eq_(
+ list(insp.column_attrs),
+ [insp.get_property('id'), insp.get_property('name')]
+ )
+ eq_(
+ insp.column_attrs.keys(),
+ ['id', 'name']
+ )
+ is_(
+ insp.column_attrs.id,
+ User.id.property
+ )
-# the parent
->>> b.parent
-<User mapper>
+ def test_synonym_filter(self):
+ User = self.classes.User
+ syn = inspect(User).synonyms
-# __clause_element__()
->>> b.expression
-User.id==Address.user_id
+ eq_(
+ list(syn.keys()), ['name_syn']
+ )
+ is_(syn.name_syn, User.name_syn.original_property)
+ eq_(dict(syn), {
+ "name_syn":User.name_syn.original_property
+ })
->>> inspect(User.id).expression
-<id column with ORM annotations>
+ def test_relationship_filter(self):
+ User = self.classes.User
+ rel = inspect(User).relationships
+ eq_(
+ rel.addresses,
+ User.addresses.property
+ )
+ eq_(
+ set(rel.keys()),
+ set(['orders', 'addresses'])
+ )
-# inspect works on instances !
->>> u1 = User(id=3, name='x')
->>> b = inspect(u1)
+ def test_insp_prop(self):
+ User = self.classes.User
+ prop = inspect(User.addresses)
+ is_(prop, User.addresses.property)
-# what's b here ? probably InstanceState
->>> b
-<InstanceState>
+ def test_rel_accessors(self):
+ User = self.classes.User
+ Address = self.classes.Address
+ prop = inspect(User.addresses)
+ is_(prop.parent, class_mapper(User))
+ is_(prop.mapper, class_mapper(Address))
->>> b.attr.keys()
-['id', 'name', 'name_syn', 'addresses']
+ assert not hasattr(prop, 'columns')
+ assert not hasattr(prop, 'expression')
-# this is class level stuff - should this require b.mapper.columns ?
->>> b.columns
-[<id column>, <name column>]
+ def test_instance_state(self):
+ User = self.classes.User
+ u1 = User()
+ insp = inspect(u1)
+ is_(insp, instance_state(u1))
-# does this return '3'? or an object?
->>> b.attr.id
-<magic attribute inspect thing>
+ def test_instance_state_attr(self):
+ User = self.classes.User
+ u1 = User(name='ed')
+ insp = inspect(u1)
-# or does this ?
->>> b.attr.id.value
-3
+ eq_(
+ set(insp.attr.keys()),
+ set(['id', 'name', 'name_syn', 'addresses', 'orders'])
+ )
+ eq_(
+ insp.attr.name.value,
+ 'ed'
+ )
+ eq_(
+ insp.attr.name.loaded_value,
+ 'ed'
+ )
->>> b.attr.id.history
-<history object>
+ def test_instance_state_attr_passive_value_scalar(self):
+ User = self.classes.User
+ u1 = User(name='ed')
+ insp = inspect(u1)
+ # value was not set, NO_VALUE
+ eq_(
+ insp.attr.id.loaded_value,
+ NO_VALUE
+ )
+ # regular accessor sets it
+ eq_(
+ insp.attr.id.value,
+ None
+ )
+ # now the None is there
+ eq_(
+ insp.attr.id.loaded_value,
+ None
+ )
->>> b.attr.id.history.unchanged
-3
+ def test_instance_state_attr_passive_value_collection(self):
+ User = self.classes.User
+ u1 = User(name='ed')
+ insp = inspect(u1)
+ # value was not set, NO_VALUE
+ eq_(
+ insp.attr.addresses.loaded_value,
+ NO_VALUE
+ )
+ # regular accessor sets it
+ eq_(
+ insp.attr.addresses.value,
+ []
+ )
+ # now the None is there
+ eq_(
+ insp.attr.addresses.loaded_value,
+ []
+ )
->>> b.attr.id.history.deleted
-None
+ def test_instance_state_attr_hist(self):
+ User = self.classes.User
+ u1 = User(name='ed')
+ insp = inspect(u1)
+ hist = insp.attr.addresses.history
+ eq_(
+ hist.unchanged, None
+ )
+ u1.addresses
+ hist = insp.attr.addresses.history
+ eq_(
+ hist.unchanged, []
+ )
-# lets assume the object is persistent
->>> s = Session()
->>> s.add(u1)
->>> s.commit()
+ def test_instance_state_ident_transient(self):
+ User = self.classes.User
+ u1 = User(name='ed')
+ insp = inspect(u1)
+ is_(insp.identity, None)
-# big one - the primary key identity ! always
-# works in query.get()
->>> b.identity
-[3]
+ def test_instance_state_ident_persistent(self):
+ User = self.classes.User
+ u1 = User(name='ed')
+ s = Session(testing.db)
+ s.add(u1)
+ s.flush()
+ insp = inspect(u1)
+ eq_(insp.identity, (u1.id,))
+ is_(s.query(User).get(insp.identity), u1)
+
+ def test_identity_key(self):
+ User = self.classes.User
+ u1 = User(name='ed')
+ s = Session(testing.db)
+ s.add(u1)
+ s.flush()
+ insp = inspect(u1)
+ eq_(
+ insp.identity_key,
+ (User, (11, ))
+ )
-# the mapper level key
->>> b.identity_key
-(User, [3])
+ def test_persistence_states(self):
+ User = self.classes.User
+ u1 = User(name='ed')
+ insp = inspect(u1)
->>> b.persistent
-True
+ eq_(
+ (insp.transient, insp.pending,
+ insp.persistent, insp.detached),
+ (True, False, False, False)
+ )
+ s = Session(testing.db)
+ s.add(u1)
->>> b.transient
-False
+ eq_(
+ (insp.transient, insp.pending,
+ insp.persistent, insp.detached),
+ (False, True, False, False)
+ )
->>> b.deleted
-False
+ s.flush()
+ eq_(
+ (insp.transient, insp.pending,
+ insp.persistent, insp.detached),
+ (False, False, True, False)
+ )
+ s.expunge(u1)
+ eq_(
+ (insp.transient, insp.pending,
+ insp.persistent, insp.detached),
+ (False, False, False, True)
+ )
->>> b.detached
-False
+ def test_session_accessor(self):
+ User = self.classes.User
+ u1 = User(name='ed')
+ insp = inspect(u1)
->>> b.session
-<session>
+ is_(insp.session, None)
+ s = Session()
+ s.add(u1)
+ is_(insp.session, s)
-# the object. this navigates obj()
-# of course, would be nice if it was b.obj...
->>> b.object_
-<User instance u1>
+ def test_object_accessor(self):
+ User = self.classes.User
+ u1 = User(name='ed')
+ insp = inspect(u1)
+ is_(insp.object, u1)
-"""
for obj in objs:
state = attributes.instance_state(obj)
sess.identity_map.discard(state)
- state.dispose()
+ state._dispose()
def _test_session(self, **kwargs):
global sess