.. changelog::
:version: 1.2.0b1
+ .. change:: 3769
+ :tags: bug, ext
+ :tickets: 3769
+
+ The :meth:`.AssociationProxy.any`, :meth:`.AssociationProxy.has`
+ and :meth:`.AssociationProxy.contains` comparison methods now support
+ linkage to an attribute that is itself also an
+ :class:`.AssociationProxy`, recursively.
+
+ .. seealso::
+
+ :ref:`change_3769`
+
.. change:: 3853
:tags: bug, ext
:tickets: 3853
:ticket:`3853`
+.. _change_3769:
+
+AssociationProxy any(), has(), contains() work with chained association proxies
+-------------------------------------------------------------------------------
+
+The :meth:`.AssociationProxy.any`, :meth:`.AssociationProxy.has`
+and :meth:`.AssociationProxy.contains` comparison methods now support
+linkage to an attribute that is
+itself also an :class:`.AssociationProxy`, recursively. Below, ``A.b_values``
+is an association proxy that links to ``AtoB.bvalue``, which is
+itself an association proxy onto ``B``::
+
+ class A(Base):
+ __tablename__ = 'a'
+ id = Column(Integer, primary_key=True)
+
+ b_values = association_proxy("atob", "b_value")
+ c_values = association_proxy("atob", "c_value")
+
+
+ class B(Base):
+ __tablename__ = 'b'
+ id = Column(Integer, primary_key=True)
+ a_id = Column(ForeignKey('a.id'))
+ value = Column(String)
+
+ c = relationship("C")
+
+
+ class C(Base):
+ __tablename__ = 'c'
+ id = Column(Integer, primary_key=True)
+ b_id = Column(ForeignKey('b.id'))
+ value = Column(String)
+
+
+ class AtoB(Base):
+ __tablename__ = 'atob'
+
+ a_id = Column(ForeignKey('a.id'), primary_key=True)
+ b_id = Column(ForeignKey('b.id'), primary_key=True)
+
+ a = relationship("A", backref="atob")
+ b = relationship("B", backref="atob")
+
+ b_value = association_proxy("b", "value")
+ c_value = association_proxy("b", "c")
+
+We can query on ``A.b_values`` using :meth:`.AssociationProxy.contains` to
+query across the two proxies ``A.b_values``, ``AtoB.b_value``:
+
+.. sourcecode:: pycon+sql
+
+ >>> s.query(A).filter(A.b_values.contains('hi')).all()
+ {opensql}SELECT a.id AS a_id
+ FROM a
+ WHERE EXISTS (SELECT 1
+ FROM atob
+ WHERE a.id = atob.a_id AND (EXISTS (SELECT 1
+ FROM b
+ WHERE b.id = atob.b_id AND b.value = :value_1)))
+
+Similarly, we can query on ``A.c_values`` using :meth:`.AssociationProxy.any`
+to query across the two proxies ``A.c_values``, ``AtoB.c_value``:
+
+.. sourcecode:: pycon+sql
+
+ >>> s.query(A).filter(A.c_values.any(value='x')).all()
+ {opensql}SELECT a.id AS a_id
+ FROM a
+ WHERE EXISTS (SELECT 1
+ FROM atob
+ WHERE a.id = atob.a_id AND (EXISTS (SELECT 1
+ FROM b
+ WHERE b.id = atob.b_id AND (EXISTS (SELECT 1
+ FROM c
+ WHERE b.id = c.b_id AND c.value = :value_1)))))
+
+:ticket:`3769`
+
New Features and Improvements - Core
====================================
def _comparator(self):
return self._get_property().comparator
+ @util.memoized_property
+ def _unwrap_target_assoc_proxy(self):
+ attr = getattr(self.target_class, self.value_attr)
+ if isinstance(attr, AssociationProxy):
+ return attr
+ return None
+
+ def _criterion_exists(self, criterion=None, **kwargs):
+ is_has = kwargs.pop('is_has', None)
+
+ target_assoc = self._unwrap_target_assoc_proxy
+ if target_assoc is not None:
+ inner = target_assoc._criterion_exists(
+ criterion=criterion, **kwargs)
+ return self._comparator._criterion_exists(inner)
+
+ if self._target_is_object:
+ prop = getattr(self.target_class, self.value_attr)
+ value_expr = prop._criterion_exists(criterion, **kwargs)
+ else:
+ if kwargs:
+ raise exc.ArgumentError(
+ "Can't apply keyword arguments to column-targeted "
+ "association proxy; use =="
+ )
+ elif is_has and criterion is not None:
+ raise exc.ArgumentError(
+ "Non-empty has() not allowed for "
+ "column-targeted association proxy; use =="
+ )
+
+ value_expr = criterion
+
+ return self._comparator._criterion_exists(value_expr)
+
def any(self, criterion=None, **kwargs):
"""Produce a proxied 'any' expression using EXISTS.
operators of the underlying proxied attributes.
"""
- if self._target_is_object:
- if self._value_is_scalar:
- value_expr = getattr(
- self.target_class, self.value_attr).has(
- criterion, **kwargs)
- else:
- value_expr = getattr(
- self.target_class, self.value_attr).any(
- criterion, **kwargs)
- else:
- value_expr = criterion
-
- # check _value_is_scalar here, otherwise
- # we're scalar->scalar - call .any() so that
- # the "can't call any() on a scalar" msg is raised.
- if self.scalar and not self._value_is_scalar:
- return self._comparator.has(
- value_expr
- )
- else:
- return self._comparator.any(
- value_expr
+ if self._unwrap_target_assoc_proxy is None and (
+ self.scalar and (
+ not self._target_is_object or self._value_is_scalar)
+ ):
+ raise exc.InvalidRequestError(
+ "'any()' not implemented for scalar "
+ "attributes. Use has()."
)
+ return self._criterion_exists(
+ criterion=criterion, is_has=False, **kwargs)
def has(self, criterion=None, **kwargs):
"""Produce a proxied 'has' expression using EXISTS.
operators of the underlying proxied attributes.
"""
-
- if self._target_is_object:
- return self._comparator.has(
- getattr(self.target_class, self.value_attr).
- has(criterion, **kwargs)
- )
- else:
- if criterion is not None or kwargs:
- raise exc.ArgumentError(
- "Non-empty has() not allowed for "
- "column-targeted association proxy; use ==")
- return self._comparator.has()
+ if self._unwrap_target_assoc_proxy is None and (
+ not self.scalar or (
+ self._target_is_object and not self._value_is_scalar)
+ ):
+ raise exc.InvalidRequestError(
+ "'has()' not implemented for collections. "
+ "Use any().")
+ return self._criterion_exists(
+ criterion=criterion, is_has=True, **kwargs)
def contains(self, obj):
"""Produce a proxied 'contains' expression using EXISTS.
operators of the underlying proxied attributes.
"""
- if self.scalar and not self._value_is_scalar:
+ target_assoc = self._unwrap_target_assoc_proxy
+ if target_assoc is not None:
+ return self._comparator._criterion_exists(
+ target_assoc.contains(obj)
+ )
+ elif self._target_is_object and self.scalar and \
+ not self._value_is_scalar:
return self._comparator.has(
getattr(self.target_class, self.value_attr).contains(obj)
)
+ elif self._target_is_object and self.scalar and \
+ self._value_is_scalar:
+ raise exc.InvalidRequestError(
+ "contains() doesn't apply to a scalar endpoint; use ==")
else:
- return self._comparator.any(**{self.value_attr: obj})
+
+ return self._comparator._criterion_exists(**{self.value_attr: obj})
def __eq__(self, obj):
# note the has() here will fail for collections; eq_()
return self._comparator.has(
getattr(self.target_class, self.value_attr) != obj)
+ def __repr__(self):
+ return "AssociationProxy(%r, %r)" % (self.target_collection, self.value_attr)
class _lazy_collection(object):
def __init__(self, obj, target):
# o2m -> scalar
singular_collection = association_proxy('user_keywords', 'value')
+ # uselist assoc_proxy -> assoc_proxy -> obj
+ common_users = association_proxy("user_keywords", "common_users")
+
+ # non uselist assoc_proxy -> assoc_proxy -> obj
+ common_singular = association_proxy("singular", "keyword")
+
+ # non uselist assoc_proxy -> assoc_proxy -> scalar
+ singular_keyword = association_proxy("singular", "keyword")
+
+ # uselist assoc_proxy -> assoc_proxy -> scalar
+ common_keyword_name = association_proxy("user_keywords", "keyword_name")
+
class Keyword(cls.Comparable):
def __init__(self, keyword):
self.keyword = keyword
self.user = user
self.keyword = keyword
+ common_users = association_proxy("keyword", "user")
+ keyword_name = association_proxy("keyword", "keyword")
+
class Singular(cls.Comparable):
def __init__(self, value=None):
self.value = value
+ keyword = association_proxy("keywords", "keyword")
+
@classmethod
def setup_mappers(cls):
users, Keyword, UserKeyword, singular, \
)
)
+ def test_filter_any_chained(self):
+ User = self.classes.User
+
+ UserKeyword, User = self.classes.UserKeyword, self.classes.User
+ Keyword = self.classes.Keyword
+
+ q1 = self.session.query(User).filter(
+ User.common_users.any(User.name == 'user7')
+ )
+ self.assert_compile(
+ q1,
+
+ "SELECT users.id AS users_id, users.name AS users_name, "
+ "users.singular_id AS users_singular_id "
+ "FROM users "
+ "WHERE EXISTS (SELECT 1 "
+ "FROM userkeywords "
+ "WHERE users.id = userkeywords.user_id AND (EXISTS (SELECT 1 "
+ "FROM keywords "
+ "WHERE keywords.id = userkeywords.keyword_id AND (EXISTS (SELECT 1 "
+ "FROM userkeywords "
+ "WHERE keywords.id = userkeywords.keyword_id AND (EXISTS (SELECT 1 "
+ "FROM users "
+ "WHERE users.id = userkeywords.user_id AND users.name = :name_1)))))))",
+ checkparams={'name_1': 'user7'}
+ )
+
+ q2 = self.session.query(User).filter(
+ User.user_keywords.any(
+ UserKeyword.keyword.has(
+ Keyword.user_keyword.has(
+ UserKeyword.user.has(
+ User.name == 'user7'
+ )
+ )
+ )))
+ self._equivalent(q1, q2)
+
+ def test_filter_has_chained_has_to_any(self):
+ User = self.classes.User
+ Singular = self.classes.Singular
+ Keyword = self.classes.Keyword
+
+ q1 = self.session.query(User).filter(
+ User.common_singular.has(Keyword.keyword == 'brown')
+ )
+ self.assert_compile(
+ q1,
+
+ "SELECT users.id AS users_id, users.name AS users_name, "
+ "users.singular_id AS users_singular_id "
+ "FROM users "
+ "WHERE EXISTS (SELECT 1 "
+ "FROM singular "
+ "WHERE singular.id = users.singular_id AND (EXISTS (SELECT 1 "
+ "FROM keywords "
+ "WHERE singular.id = keywords.singular_id AND "
+ "keywords.keyword = :keyword_1)))",
+ checkparams={'keyword_1': 'brown'}
+ )
+
+ q2 = self.session.query(User).filter(
+ User.singular.has(
+ Singular.keywords.any(Keyword.keyword == 'brown')))
+ self._equivalent(q1, q2)
+
+ def test_filter_has_scalar_raises(self):
+ User = self.classes.User
+ assert_raises_message(
+ exc.ArgumentError,
+ r"Can't apply keyword arguments to column-targeted",
+ User.singular_keyword.has, keyword="brown"
+ )
+
+ def test_filter_contains_chained_has_to_any(self):
+ User = self.classes.User
+ Keyword = self.classes.Keyword
+ Singular = self.classes.Singular
+
+ q1 = self.session.query(User).filter(
+ User.singular_keyword.contains("brown")
+ )
+ self.assert_compile(
+ q1,
+ "SELECT users.id AS users_id, users.name AS users_name, "
+ "users.singular_id AS users_singular_id "
+ "FROM users "
+ "WHERE EXISTS (SELECT 1 "
+ "FROM singular "
+ "WHERE singular.id = users.singular_id AND (EXISTS (SELECT 1 "
+ "FROM keywords "
+ "WHERE singular.id = keywords.singular_id "
+ "AND keywords.keyword = :keyword_1)))",
+ checkparams={'keyword_1': 'brown'}
+ )
+ q2 = self.session.query(User).filter(
+ User.singular.has(
+ Singular.keywords.any(
+ Keyword.keyword == 'brown'
+ )
+ )
+ )
+
+ self._equivalent(q1, q2)
+
+ def test_filter_contains_chained_any_to_has(self):
+ User = self.classes.User
+ Keyword = self.classes.Keyword
+ UserKeyword = self.classes.UserKeyword
+
+ q1 = self.session.query(User).filter(
+ User.common_keyword_name.contains("brown")
+ )
+ self.assert_compile(
+ q1,
+ "SELECT users.id AS users_id, users.name AS users_name, "
+ "users.singular_id AS users_singular_id "
+ "FROM users "
+ "WHERE EXISTS (SELECT 1 "
+ "FROM userkeywords "
+ "WHERE users.id = userkeywords.user_id AND (EXISTS (SELECT 1 "
+ "FROM keywords "
+ "WHERE keywords.id = userkeywords.keyword_id AND "
+ "keywords.keyword = :keyword_1)))",
+ checkparams={'keyword_1': 'brown'}
+ )
+
+ q2 = self.session.query(User).filter(
+ User.user_keywords.any(
+ UserKeyword.keyword.has(Keyword.keyword == "brown")
+ )
+ )
+ self._equivalent(q1, q2)
+
def test_has_criterion_nul(self):
# but we don't allow that with any criterion...
User = self.classes.User
assert_raises_message(
exc.ArgumentError,
- r"Non-empty has\(\) not allowed",
+ r"Can't apply keyword arguments to column-targeted",
User.singular_value.has, singular_value="singular4"
)