--- /dev/null
+.. change::
+ :tags: bug, ext
+ :tickets: 4401
+
+ Fixed a regression in 1.3.0b1 caused by :ticket:`3423` where association
+ proxy objects that access an attribute that's only present on a polymorphic
+ subclass would raise an ``AttributeError`` even though the actual instance
+ being accessed was an instance of that subclass.
def __get__(self, obj, class_):
if class_ is None:
return self
- inst = self._as_instance(class_)
+ inst = self._as_instance(class_, obj)
if inst:
return inst.get(obj)
- else:
- return self
+
+ # obj has to be None here
+ # assert obj is None
+
+ return self
def __set__(self, obj, values):
class_ = type(obj)
- return self._as_instance(class_).set(obj, values)
+ return self._as_instance(class_, obj).set(obj, values)
def __delete__(self, obj):
class_ = type(obj)
- return self._as_instance(class_).delete(obj)
+ return self._as_instance(class_, obj).delete(obj)
- def for_class(self, class_):
+ def for_class(self, class_, obj=None):
"""Return the internal state local to a specific mapped class.
E.g., given a class ``User``::
is specific to the ``User`` class. The :class:`.AssociationProxy`
object remains agnostic of its parent class.
+ :param class_: the class that we are returning state for.
+
+ :param obj: optional, an instance of the class that is required
+ if the attribute refers to a polymorphic target, e.g. where we have
+ to look at the type of the actual destination object to get the
+ complete path.
+
.. versionadded:: 1.3 - :class:`.AssociationProxy` no longer stores
any state specific to a particular parent class; the state is now
stored in per-class :class:`.AssociationProxyInstance` objects.
"""
- return self._as_instance(class_)
+ return self._as_instance(class_, obj)
- def _as_instance(self, class_):
+ def _as_instance(self, class_, obj):
try:
- return class_.__dict__[self.key + "_inst"]
+ inst = class_.__dict__[self.key + "_inst"]
except KeyError:
owner = self._calc_owner(class_)
if owner is not None:
- result = AssociationProxyInstance.for_proxy(self, owner)
- setattr(class_, self.key + "_inst", result)
- return result
+ inst = AssociationProxyInstance.for_proxy(self, owner, obj)
+ setattr(class_, self.key + "_inst", inst)
else:
- return None
+ inst = None
+
+ if inst is not None and not inst._is_canonical:
+ # the AssociationProxyInstance can't be generalized
+ # since the proxied attribute is not on the targeted
+ # class, only on subclasses of it, which might be
+ # different. only return for the specific
+ # object's current value
+ return inst._non_canonical_get_for_object(obj)
+ else:
+ return inst
def _calc_owner(self, target_cls):
# we might be getting invoked for a subclass
self.key = parent.key
self.owning_class = owning_class
self.target_collection = parent.target_collection
- self.value_attr = parent.value_attr
self.collection_class = None
self.target_class = target_class
self.value_attr = value_attr
"""
@classmethod
- def for_proxy(cls, parent, owning_class):
+ def for_proxy(cls, parent, owning_class, parent_instance):
target_collection = parent.target_collection
value_attr = parent.value_attr
prop = orm.class_mapper(owning_class).\
get_property(target_collection)
+
+ # this was never asserted before but this should be made clear.
+ if not isinstance(prop, orm.RelationshipProperty):
+ raise NotImplementedError(
+ "association proxy to a non-relationship "
+ "intermediary is not supported")
+
target_class = prop.mapper.class_
- target_assoc = cls._cls_unwrap_target_assoc_proxy(
- target_class, value_attr)
+ try:
+ target_assoc = cls._cls_unwrap_target_assoc_proxy(
+ target_class, value_attr)
+ except AttributeError:
+ # the proxied attribute doesn't exist on the target class;
+ # return an "ambiguous" instance that will work on a per-object
+ # basis
+ return AmbiguousAssociationProxyInstance(
+ parent, owning_class, target_class, value_attr
+ )
+ else:
+ return cls._construct_for_assoc(
+ target_assoc, parent, owning_class, target_class, value_attr
+ )
+
+ @classmethod
+ def _construct_for_assoc(
+ cls, target_assoc, parent, owning_class,
+ target_class, value_attr):
if target_assoc is not None:
return ObjectAssociationProxyInstance(
parent, owning_class, target_class, value_attr
criterion=criterion, is_has=True, **kwargs)
+class AmbiguousAssociationProxyInstance(AssociationProxyInstance):
+ """an :class:`.AssociationProxyInstance` where we cannot determine
+ the type of target object.
+ """
+
+ _is_canonical = False
+
+ def _ambiguous(self):
+ raise AttributeError(
+ "Association proxy %s.%s refers to an attribute '%s' that is not "
+ "directly mapped on class %s; therefore this operation cannot "
+ "proceed since we don't know what type of object is referred "
+ "towards" % (
+ self.owning_class.__name__, self.target_collection,
+ self.value_attr, self.target_class
+ ))
+
+ def get(self, obj):
+ self._ambiguous()
+
+ def set(self, obj, values):
+ self._ambiguous()
+
+ def delete(self, obj):
+ self._ambiguous()
+
+ def any(self, criterion=None, **kwargs):
+ self._ambiguous()
+
+ def has(self, criterion=None, **kwargs):
+ self._ambiguous()
+
+ @util.memoized_property
+ def _lookup_cache(self):
+ # mapping of <subclass>->AssociationProxyInstance.
+ # e.g. proxy is A-> A.b -> B -> B.b_attr, but B.b_attr doesn't exist;
+ # only B1(B) and B2(B) have "b_attr", keys in here would be B1, B2
+ return {}
+
+ def _non_canonical_get_for_object(self, parent_instance):
+ if parent_instance is not None:
+ actual_obj = getattr(parent_instance, self.target_collection)
+ if actual_obj is not None:
+ instance_class = type(actual_obj)
+ if instance_class not in self._lookup_cache:
+ self._populate_cache(instance_class)
+
+ try:
+ return self._lookup_cache[instance_class]
+ except KeyError:
+ pass
+
+ # no object or ambiguous object given, so return "self", which
+ # is a no-op proxy.
+ return self
+
+ def _populate_cache(self, instance_class):
+ prop = orm.class_mapper(self.owning_class).\
+ get_property(self.target_collection)
+
+ if inspect(instance_class).mapper.isa(prop.mapper):
+ target_class = instance_class
+ try:
+ target_assoc = self._cls_unwrap_target_assoc_proxy(
+ target_class, self.value_attr)
+ except AttributeError:
+ pass
+ else:
+ self._lookup_cache[instance_class] = \
+ self._construct_for_assoc(
+ target_assoc, self.parent, self.owning_class,
+ target_class, self.value_attr
+ )
+
+
class ObjectAssociationProxyInstance(AssociationProxyInstance):
"""an :class:`.AssociationProxyInstance` that has an object as a target.
"""
_target_is_object = True
+ _is_canonical = True
def contains(self, obj):
"""Produce a proxied 'contains' expression using EXISTS.
target.
"""
_target_is_object = False
+ _is_canonical = True
def __eq__(self, other):
# special case "is None" to check for no related row as well
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.engine import default
+from sqlalchemy import inspect
+
class DictCollection(dict):
@collection.appender
eq_(Foob.assoc.info, {'foo': 'bar'})
-class MultiOwnerTest(fixtures.DeclarativeMappedTest,
- testing.AssertsCompiledSQL):
+class OnlyRelationshipTest(fixtures.DeclarativeMappedTest):
+ run_define_tables = None
+ run_create_tables = None
+ run_inserts = None
+ run_deletes = None
+
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class Foo(Base):
+ __tablename__ = 'foo'
+
+ id = Column(Integer, primary_key=True)
+ foo = Column(String) # assume some composite datatype
+
+ bar = association_proxy("foo", "attr")
+
+ def test_setattr(self):
+ Foo = self.classes.Foo
+
+ f1 = Foo()
+
+ assert_raises_message(
+ NotImplementedError,
+ "association proxy to a non-relationship "
+ "intermediary is not supported",
+ setattr, f1, 'bar', 'asdf'
+ )
+
+ def test_getattr(self):
+ Foo = self.classes.Foo
+
+ f1 = Foo()
+
+ assert_raises_message(
+ NotImplementedError,
+ "association proxy to a non-relationship "
+ "intermediary is not supported",
+ getattr, f1, 'bar'
+ )
+
+ def test_get_class_attr(self):
+ Foo = self.classes.Foo
+
+ assert_raises_message(
+ NotImplementedError,
+ "association proxy to a non-relationship "
+ "intermediary is not supported",
+ getattr, Foo, 'bar'
+ )
+
+
+class MultiOwnerTest(
+ fixtures.DeclarativeMappedTest,
+ testing.AssertsCompiledSQL):
__dialect__ = 'default'
+ run_define_tables = 'each'
+ run_create_tables = None
+ run_inserts = None
+ run_deletes = None
+ run_setup_classes = 'each'
+ run_setup_mappers = 'each'
+
@classmethod
def setup_classes(cls):
Base = cls.DeclarativeBasic
__tablename__ = 'b'
id = Column(ForeignKey('a.id'), primary_key=True)
+ c1_id = Column(ForeignKey("c1.id"))
+
ds = relationship("D", primaryjoin="D.b_id == B.id")
__mapper_args__ = {"polymorphic_identity": "b"}
__tablename__ = 'c'
id = Column(ForeignKey('a.id'), primary_key=True)
- ds = relationship("D", primaryjoin="D.c_id == C.id")
+ ds = relationship(
+ "D", primaryjoin="D.c_id == C.id", back_populates="c")
__mapper_args__ = {"polymorphic_identity": "c"}
+ class C1(C):
+ __tablename__ = 'c1'
+ id = Column(ForeignKey('c.id'), primary_key=True)
+
+ csub_only_data = relationship("B") # uselist=True relationship
+
+ ds = relationship(
+ "D", primaryjoin="D.c1_id == C1.id", back_populates="c")
+
+ __mapper_args__ = {"polymorphic_identity": "c1"}
+
class C2(C):
__tablename__ = 'c2'
id = Column(ForeignKey('c.id'), primary_key=True)
- ds = relationship("D", primaryjoin="D.c2_id == C2.id")
+ csub_only_data = Column(String(50)) # scalar Column
+
+ ds = relationship(
+ "D", primaryjoin="D.c2_id == C2.id", back_populates="c")
__mapper_args__ = {"polymorphic_identity": "c2"}
value = Column(String(50))
b_id = Column(ForeignKey('b.id'))
c_id = Column(ForeignKey('c.id'))
+ c1_id = Column(ForeignKey('c1.id'))
c2_id = Column(ForeignKey('c2.id'))
+ c = relationship("C", primaryjoin="D.c_id == C.id")
+
+ c_data = association_proxy("c", "csub_only_data")
+
+ def _assert_raises_ambiguous(self, fn, *arg, **kw):
+ assert_raises_message(
+ AttributeError,
+ "Association proxy D.c refers to an attribute 'csub_only_data'",
+ fn, *arg, **kw
+ )
+
def test_column_collection_expressions(self):
B, C, C2 = self.classes("B", "C", "C2")
"AND (d.value LIKE '%' || :value_1 || '%'))"
)
+ def test_subclass_only_owner_none(self):
+ D, C, C2 = self.classes("D", "C", "C2")
+
+ d1 = D()
+ self._assert_raises_ambiguous(
+ getattr, d1, 'c_data'
+ )
+
+ def test_subclass_only_owner_assign(self):
+ D, C, C2 = self.classes("D", "C", "C2")
+
+ d1 = D(c=C2())
+ d1.c_data = 'some c2'
+ eq_(d1.c_data, "some c2")
+
+ def test_subclass_only_owner_get(self):
+ D, C, C2 = self.classes("D", "C", "C2")
+
+ d1 = D(c=C2(csub_only_data='some c2'))
+ eq_(d1.c_data, "some c2")
+
+ def test_subclass_only_owner_none_raise(self):
+ D, C, C2 = self.classes("D", "C", "C2")
+
+ d1 = D()
+ self._assert_raises_ambiguous(
+ getattr, d1, "c_data"
+ )
+
+ def test_subclass_only_owner_delete(self):
+ D, C, C2 = self.classes("D", "C", "C2")
+
+ d1 = D(c=C2(csub_only_data='some c2'))
+ del d1.c_data
+
+ self._assert_raises_ambiguous(
+ getattr, d1, "c_data"
+ )
+
+ def test_subclass_only_owner_assign_raises(self):
+ D, C, C2 = self.classes("D", "C", "C2")
+
+ d1 = D(c=C())
+ self._assert_raises_ambiguous(
+ setattr, d1, "c_data", 'some c1'
+ )
+
+ def test_subclass_only_owner_get_raises(self):
+ D, C, C2 = self.classes("D", "C", "C2")
+
+ d1 = D(c=C())
+ self._assert_raises_ambiguous(
+ getattr, d1, "c_data"
+ )
+
+ def test_subclass_only_owner_delete_raises(self):
+ D, C, C2 = self.classes("D", "C", "C2")
+
+ d1 = D(c=C2(csub_only_data='some c2'))
+ eq_(d1.c_data, "some c2")
+
+ # now switch
+ d1.c = C()
+
+ self._assert_raises_ambiguous(
+ delattr, d1, "c_data"
+ )
+
+ def test_subclasses_conflicting_types(self):
+ B, D, C, C1, C2 = self.classes("B", "D", "C", "C1", "C2")
+
+ bs = [B(), B()]
+ d1 = D(c=C1(csub_only_data=bs))
+ d2 = D(c=C2(csub_only_data='some c2'))
+
+ association_proxy_object = inspect(D).all_orm_descriptors['c_data']
+ inst1 = association_proxy_object.for_class(D, d1)
+ inst2 = association_proxy_object.for_class(D, d2)
+
+ eq_(inst1._target_is_object, True)
+ eq_(inst2._target_is_object, False)
+
+ # both instances are cached
+ inst0 = association_proxy_object.for_class(D)
+ eq_(inst0._lookup_cache, {C1: inst1, C2: inst2})
+
+ # cache works
+ is_(association_proxy_object.for_class(D, d1), inst1)
+ is_(association_proxy_object.for_class(D, d2), inst2)
+
+ def test_col_expressions_not_available(self):
+ D, = self.classes("D")
+
+ self._assert_raises_ambiguous(
+ lambda: D.c_data == 5
+ )
+
+ def test_rel_expressions_not_available(self):
+ B, D, = self.classes("B", "D")
+
+ self._assert_raises_ambiguous(
+ lambda: D.c_data.any(B.id == 5)
+ )
+
class ScopeBehaviorTest(fixtures.DeclarativeMappedTest):
# test some GC scenarios, including issue #4268