self.proxy_bulk_set = proxy_bulk_set
self.cascade_scalar_deletes = cascade_scalar_deletes
- self.owning_class = None
self.key = '_%s_%s_%s' % (
type(self).__name__, target_collection, id(self))
- self.collection_class = None
if info:
self.info = info
+ def __get__(self, obj, class_):
+ if class_ is None:
+ return self
+ inst = self._as_instance(class_)
+ if inst:
+ return inst.get(obj)
+ else:
+ return self
+
+ def __set__(self, obj, values):
+ class_ = type(obj)
+ return self._as_instance(class_).set(obj, values)
+
+ def __delete__(self, obj):
+ class_ = type(obj)
+ return self._as_instance(class_).delete(obj)
+
+ def for_class(self, class_):
+ """Return the internal state local to a specific mapped class.
+
+ E.g., given a class ``User``::
+
+ class User(Base):
+ # ...
+
+ keywords = association_proxy('kws', 'keyword')
+
+ If we access this :class:`.AssociationProxy` from
+ :attr:`.Mapper.all_orm_descriptors`, and we want to view the
+ target class for this proxy as mapped by ``User``::
+
+ inspect(User).all_orm_descriptors["keywords"].for_class(User).target_class
+
+ This returns an instance of :class:`.AssociationProxyInstance` that
+ is specific to the ``User`` class. The :class:`.AssociationProxy`
+ object remains agnostic of its parent class.
+
+ .. versionadded:: 1.3 - :class:`.AssociationProxy` no longer stores
+ any state specific to a particular parent class; the state is now
+ stored in per-class :class:`.AssociationProxyInstance` objects.
+
+
+ """
+ return self._as_instance(class_)
+
+ def _as_instance(self, class_):
+ try:
+ return class_.__dict__[self.key + "_inst"]
+ except KeyError:
+ owner = self._calc_owner(class_)
+ if owner is not None:
+ result = AssociationProxyInstance(self, owner)
+ setattr(class_, self.key + "_inst", result)
+ return result
+ else:
+ return None
+
+ def _calc_owner(self, target_cls):
+ # we might be getting invoked for a subclass
+ # that is not mapped yet, in some declarative situations.
+ # save until we are mapped
+ try:
+ insp = inspect(target_cls)
+ except exc.NoInspectionAvailable:
+ # can't find a mapper, don't set owner. if we are a not-yet-mapped
+ # subclass, we can also scan through __mro__ to find a mapped
+ # class, but instead just wait for us to be called again against a
+ # mapped class normally.
+ return None
+ else:
+ return insp.mapper.class_manager.class_
+
+ def _default_getset(self, collection_class):
+ attr = self.value_attr
+ _getter = operator.attrgetter(attr)
+
+ def getter(target):
+ return _getter(target) if target is not None else None
+ if collection_class is dict:
+ def setter(o, k, v):
+ setattr(o, attr, v)
+ else:
+ def setter(o, v):
+ setattr(o, attr, v)
+ return getter, setter
+
+
+class AssociationProxyInstance(object):
+ """A per-class object that serves class- and object-specific results.
+
+ This is used by :class:`.AssociationProxy` when it is invoked
+ in terms of a specific class or instance of a class, i.e. when it is
+ used as a regular Python descriptor.
+
+ When referring to the :class:`.AssociationProxy` as a normal Python
+ descriptor, the :class:`.AssociationProxyInstance` is the object that
+ actually serves the information. Under normal circumstances, its presence
+ is transparent::
+
+ >>> User.keywords.scalar
+ False
+
+ In the special case that the :class:`.AssociationProxy` object is being
+ accessed directly, in order to get an explicit handle to the
+ :class:`.AssociationProxyInstance`, use the
+ :meth:`.AssociationProxy.for_class` method::
+
+ proxy_state = inspect(User).all_orm_descriptors["keywords"].for_class(User)
+
+ # view if proxy object is scalar or not
+ >>> proxy_state.scalar
+ False
+
+ .. versionadded:: 1.3
+
+ """
+
+ def __init__(self, parent, owning_class):
+ self.parent = parent
+ self.key = parent.key
+ self.owning_class = owning_class
+ self.target_collection = parent.target_collection
+ self.value_attr = parent.value_attr
+ self.collection_class = None
+
+ def _get_property(self):
+ return orm.class_mapper(self.owning_class).\
+ get_property(self.target_collection)
+
+ @property
+ def _comparator(self):
+ return self._get_property().comparator
+
+ @util.memoized_property
+ def _unwrap_target_assoc_proxy(self):
+ attr = getattr(self.target_class, self.value_attr)
+ if isinstance(attr, (AssociationProxy, AssociationProxyInstance)):
+ return attr
+ return None
+
@property
def remote_attr(self):
"""The 'remote' :class:`.MapperProperty` referenced by this
- :class:`.AssociationProxy`.
-
- .. versionadded:: 0.7.3
+ :class:`.AssociationProxyInstance`.
- See also:
+ ..seealso::
- :attr:`.AssociationProxy.attr`
+ :attr:`.AssociationProxyInstance.attr`
- :attr:`.AssociationProxy.local_attr`
+ :attr:`.AssociationProxyInstance.local_attr`
"""
return getattr(self.target_class, self.value_attr)
@property
def local_attr(self):
"""The 'local' :class:`.MapperProperty` referenced by this
- :class:`.AssociationProxy`.
-
- .. versionadded:: 0.7.3
+ :class:`.AssociationProxyInstance`.
- See also:
+ .. seealso::
- :attr:`.AssociationProxy.attr`
+ :attr:`.AssociationProxyInstance.attr`
- :attr:`.AssociationProxy.remote_attr`
+ :attr:`.AssociationProxyInstance.remote_attr`
"""
return getattr(self.owning_class, self.target_collection)
sess.query(Parent).join(*Parent.proxied.attr)
- .. versionadded:: 0.7.3
+ .. seealso::
- See also:
+ :attr:`.AssociationProxyInstance.local_attr`
- :attr:`.AssociationProxy.local_attr`
-
- :attr:`.AssociationProxy.remote_attr`
+ :attr:`.AssociationProxyInstance.remote_attr`
"""
return (self.local_attr, self.remote_attr)
- def _get_property(self):
- owning_class = self.owning_class
- if owning_class is None:
- raise exc.InvalidRequestError(
- "This association proxy has no mapped owning class; "
- "can't locate a mapped property")
- return (orm.class_mapper(owning_class).
- get_property(self.target_collection))
-
@util.memoized_property
def target_class(self):
- """The intermediary class handled by this :class:`.AssociationProxy`.
+ """The intermediary class handled by this
+ :class:`.AssociationProxyInstance`.
Intercepted append/set/assignment events will result
in the generation of new instances of this class.
@util.memoized_property
def scalar(self):
- """Return ``True`` if this :class:`.AssociationProxy` proxies a scalar
- relationship on the local side."""
+ """Return ``True`` if this :class:`.AssociationProxyInstance`
+ proxies a scalar relationship on the local side."""
scalar = not self._get_property().uselist
if scalar:
def _target_is_object(self):
return getattr(self.target_class, self.value_attr).impl.uses_objects
- def _calc_owner(self, obj, class_):
- if obj is not None and class_ is None:
- target_cls = type(obj)
- elif class_ is not None:
- target_cls = class_
+ def _initialize_scalar_accessors(self):
+ if self.parent.getset_factory:
+ get, set = self.parent.getset_factory(None, self)
else:
- return
+ get, set = self.parent._default_getset(None)
+ self._scalar_get, self._scalar_set = get, set
- # we might be getting invoked for a subclass
- # that is not mapped yet, in some declarative situations.
- # save until we are mapped
- try:
- insp = inspect(target_cls)
- except exc.NoInspectionAvailable:
- # can't find a mapper, don't set owner. if we are a not-yet-mapped
- # subclass, we can also scan through __mro__ to find a mapped
- # class, but instead just wait for us to be called again against a
- # mapped class normally.
- return
+ def _default_getset(self, collection_class):
+ attr = self.value_attr
+ _getter = operator.attrgetter(attr)
- # note we can get our real .key here too
- owner = insp.mapper.class_manager._locate_owning_manager(self)
- if owner is not None:
- self.owning_class = owner.class_
+ def getter(target):
+ return _getter(target) if target is not None else None
+ if collection_class is dict:
+ def setter(o, k, v):
+ return setattr(o, attr, v)
else:
- # the proxy is attached to a class that is not mapped
- # (like a mixin), we are mapped, so, it's us.
- self.owning_class = target_cls
+ def setter(o, v):
+ return setattr(o, attr, v)
+ return getter, setter
- def __get__(self, obj, class_):
- if self.owning_class is None:
- self._calc_owner(obj, class_)
+ @property
+ def info(self):
+ return self.parent.info
+ def get(self, obj):
if obj is None:
return self
try:
# If the owning instance is reborn (orm session resurrect,
# etc.), refresh the proxy cache.
- creator_id, proxy = getattr(obj, self.key)
- if id(obj) == creator_id:
- return proxy
+ creator_id, self_id, proxy = getattr(obj, self.key)
except AttributeError:
pass
- proxy = self._new(_lazy_collection(obj, self.target_collection))
- setattr(obj, self.key, (id(obj), proxy))
- return proxy
+ else:
+ if id(obj) == creator_id and id(self) == self_id:
+ assert self.collection_class is not None
+ return proxy
- def __set__(self, obj, values):
- if self.owning_class is None:
- self._calc_owner(obj, None)
+ self.collection_class, proxy = self._new(
+ _lazy_collection(obj, self.target_collection))
+ setattr(obj, self.key, (id(obj), id(self), proxy))
+ return proxy
+ def set(self, obj, values):
if self.scalar:
- creator = self.creator and self.creator or self.target_class
+ creator = self.parent.creator \
+ if self.parent.creator else self.target_class
target = getattr(obj, self.target_collection)
if target is None:
if values is None:
setattr(obj, self.target_collection, creator(values))
else:
self._scalar_set(target, values)
- if values is None and self.cascade_scalar_deletes:
+ if values is None and self.parent.cascade_scalar_deletes:
setattr(obj, self.target_collection, None)
else:
- proxy = self.__get__(obj, None)
+ proxy = self.get(obj)
+ assert self.collection_class is not None
if proxy is not values:
proxy.clear()
self._set(proxy, values)
- def __delete__(self, obj):
+ def delete(self, obj):
if self.owning_class is None:
self._calc_owner(obj, None)
delattr(target, self.value_attr)
delattr(obj, self.target_collection)
- def _initialize_scalar_accessors(self):
- if self.getset_factory:
- get, set = self.getset_factory(None, self)
- else:
- get, set = self._default_getset(None)
- self._scalar_get, self._scalar_set = get, set
-
- def _default_getset(self, collection_class):
- attr = self.value_attr
- _getter = operator.attrgetter(attr)
-
- def getter(target):
- return _getter(target) if target is not None else None
-
- if collection_class is dict:
- def setter(o, k, v):
- setattr(o, attr, v)
- else:
- def setter(o, v):
- setattr(o, attr, v)
- return getter, setter
-
def _new(self, lazy_collection):
- creator = self.creator and self.creator or self.target_class
- self.collection_class = util.duck_type_collection(lazy_collection())
+ creator = self.parent.creator if self.parent.creator else \
+ self.target_class
+ collection_class = util.duck_type_collection(lazy_collection())
- if self.proxy_factory:
- return self.proxy_factory(
+ if self.parent.proxy_factory:
+ return collection_class, self.parent.proxy_factory(
lazy_collection, creator, self.value_attr, self)
- if self.getset_factory:
- getter, setter = self.getset_factory(self.collection_class, self)
+ if self.parent.getset_factory:
+ getter, setter = self.parent.getset_factory(
+ collection_class, self)
else:
- getter, setter = self._default_getset(self.collection_class)
+ getter, setter = self.parent._default_getset(collection_class)
- if self.collection_class is list:
- return _AssociationList(
+ if collection_class is list:
+ return collection_class, _AssociationList(
lazy_collection, creator, getter, setter, self)
- elif self.collection_class is dict:
- return _AssociationDict(
+ elif collection_class is dict:
+ return collection_class, _AssociationDict(
lazy_collection, creator, getter, setter, self)
- elif self.collection_class is set:
- return _AssociationSet(
+ elif collection_class is set:
+ return collection_class, _AssociationSet(
lazy_collection, creator, getter, setter, self)
else:
raise exc.ArgumentError(
'proxy_factory and proxy_bulk_set manually' %
(self.collection_class.__name__, self.target_collection))
- def _inflate(self, proxy):
- creator = self.creator and self.creator or self.target_class
-
- if self.getset_factory:
- getter, setter = self.getset_factory(self.collection_class, self)
- else:
- getter, setter = self._default_getset(self.collection_class)
-
- proxy.creator = creator
- proxy.getter = getter
- proxy.setter = setter
-
def _set(self, proxy, values):
- if self.proxy_bulk_set:
- self.proxy_bulk_set(proxy, values)
+ if self.parent.proxy_bulk_set:
+ self.parent.proxy_bulk_set(proxy, values)
elif self.collection_class is list:
proxy.extend(values)
elif self.collection_class is dict:
'no proxy_bulk_set supplied for custom '
'collection_class implementation')
- @property
- def _comparator(self):
- return self._get_property().comparator
+ def _inflate(self, proxy):
+ creator = self.parent.creator and \
+ self.parent.creator or self.target_class
- @util.memoized_property
- def _unwrap_target_assoc_proxy(self):
- attr = getattr(self.target_class, self.value_attr)
- if isinstance(attr, AssociationProxy):
- return attr
- return None
+ if self.parent.getset_factory:
+ getter, setter = self.parent.getset_factory(
+ self.collection_class, self)
+ else:
+ getter, setter = self.parent._default_getset(self.collection_class)
+
+ proxy.creator = creator
+ proxy.getter = getter
+ proxy.setter = setter
def _criterion_exists(self, criterion=None, **kwargs):
is_has = kwargs.pop('is_has', None)
return self._comparator.has(
getattr(self.target_class, self.value_attr) != obj)
- def __repr__(self):
- return "AssociationProxy(%r, %r)" % (
- self.target_collection, self.value_attr)
-
class _lazy_collection(object):
def __init__(self, obj, target):
__tablename__ = 'subparent'
id = Column(Integer, ForeignKey(Parent.id), primary_key=True)
- is_(SubParent.children.owning_class, Parent)
+ is_(SubParent.children.owning_class, SubParent)
+ is_(Parent.children.owning_class, Parent)
def test_resolved_to_correct_class_two(self):
Base = declarative_base()
__tablename__ = 'subsubparent'
id = Column(Integer, ForeignKey(SubParent.id), primary_key=True)
- is_(SubSubParent.children.owning_class, SubParent)
+ is_(SubParent.children.owning_class, SubParent)
+ is_(SubSubParent.children.owning_class, SubSubParent)
def test_resolved_to_correct_class_four(self):
Base = declarative_base()
sp = SubParent()
sp.children = 'c'
- is_(SubParent.children.owning_class, Parent)
+ is_(SubParent.children.owning_class, SubParent)
+ is_(Parent.children.owning_class, Parent)
def test_resolved_to_correct_class_five(self):
Base = declarative_base()
is_(Parent.children.owning_class, Parent)
eq_(p1.children, ["c1"])
- def test_never_assign_nonetype(self):
+ def _test_never_assign_nonetype(self):
foo = association_proxy('x', 'y')
foo._calc_owner(None, None)
is_(foo.owning_class, None)
Foob.assoc.info["foo"] = 'bar'
eq_(Foob.assoc.info, {'foo': 'bar'})
+
+class MultiOwnerTest(fixtures.DeclarativeMappedTest,
+ testing.AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class A(Base):
+ __tablename__ = 'a'
+ id = Column(Integer, primary_key=True)
+ type = Column(String(5), nullable=False)
+ d_values = association_proxy("ds", "value")
+
+ __mapper_args__ = {"polymorphic_on": type}
+
+ class B(A):
+ __tablename__ = 'b'
+ id = Column(ForeignKey('a.id'), primary_key=True)
+
+ ds = relationship("D", primaryjoin="D.b_id == B.id")
+
+ __mapper_args__ = {"polymorphic_identity": "b"}
+
+ class C(A):
+ __tablename__ = 'c'
+ id = Column(ForeignKey('a.id'), primary_key=True)
+
+ ds = relationship("D", primaryjoin="D.c_id == C.id")
+
+ __mapper_args__ = {"polymorphic_identity": "c"}
+
+ class C2(C):
+ __tablename__ = 'c2'
+ id = Column(ForeignKey('c.id'), primary_key=True)
+
+ ds = relationship("D", primaryjoin="D.c2_id == C2.id")
+
+ __mapper_args__ = {"polymorphic_identity": "c2"}
+
+ class D(Base):
+ __tablename__ = 'd'
+ id = Column(Integer, primary_key=True)
+ value = Column(String(50))
+ b_id = Column(ForeignKey('b.id'))
+ c_id = Column(ForeignKey('c.id'))
+ c2_id = Column(ForeignKey('c2.id'))
+
+ def test_any_has(self):
+ B, C, C2 = self.classes("B", "C", "C2")
+
+ self.assert_compile(
+ B.d_values.contains('b1'),
+ "EXISTS (SELECT 1 FROM d, b WHERE d.b_id = b.id "
+ "AND d.value = :value_1)"
+ )
+
+ self.assert_compile(
+ C2.d_values.contains("c2"),
+ "EXISTS (SELECT 1 FROM d, c2 WHERE d.c2_id = c2.id "
+ "AND d.value = :value_1)"
+ )
+
+ self.assert_compile(
+ C.d_values.contains('c1'),
+ "EXISTS (SELECT 1 FROM d, c WHERE d.c_id = c.id "
+ "AND d.value = :value_1)"
+ )
+